You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/30 03:13:07 UTC

[pulsar] branch master updated: Use randomized names for kafka container in integration tests (#3412)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cc3eb7f  Use randomized names for kafka container in integration tests (#3412)
cc3eb7f is described below

commit cc3eb7f44b8ac884e83d9f0f127c216edd897d9d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 29 19:13:03 2019 -0800

    Use randomized names for kafka container in integration tests (#3412)
    
    * Use randomized names for kafka container in integration testss
    
    * Revert change in KafkaSourceTester
    
    * One more fix for source container names
    
    * Use different name for different tests
---
 .../integration/functions/PulsarFunctionsTest.java    |  3 ++-
 .../pulsar/tests/integration/io/KafkaSinkTester.java  | 19 ++++++++++---------
 .../tests/integration/io/KafkaSourceTester.java       |  8 ++++----
 3 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 23b7259..18eaba9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -73,7 +73,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
     @Test
     public void testKafkaSink() throws Exception {
-        testSink(new KafkaSinkTester(), true, new KafkaSourceTester());
+        String kafkaContainerName = "kafka-" + randomName(8);
+        testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName));
     }
 
     @Test(enabled = false)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 6713cc1..9077146 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.io;
 
 import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
+import static org.apache.pulsar.tests.integration.topologies.PulsarTestBase.randomName;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -42,17 +43,18 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 @Slf4j
 public class KafkaSinkTester extends SinkTester<KafkaContainer> {
 
-    private static final String NAME = "kafka";
-
     private final String kafkaTopicName;
     private KafkaConsumer<String, String> kafkaConsumer;
 
-    public KafkaSinkTester() {
-        super(NAME, SinkType.KAFKA);
+    private final String containerName;
+
+      public KafkaSinkTester(String containerName) {
+        super(containerName, SinkType.KAFKA);
+        this.containerName = containerName;
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_sink_topic_" + suffix;
 
-        sinkConfig.put("bootstrapServers", NAME + ":9092");
+        sinkConfig.put("bootstrapServers", networkAlias + ":9092");
         sinkConfig.put("acks", "all");
         sinkConfig.put("batchSize", 1L);
         sinkConfig.put("maxRequestSize", 1048576L);
@@ -61,13 +63,12 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
 
     @Override
     protected KafkaContainer createSinkService(PulsarCluster cluster) {
-        final String kafkaServiceName = NAME;
         return new KafkaContainer()
                 .withEmbeddedZookeeper()
-                .withNetworkAliases(kafkaServiceName)
+                .withNetworkAliases(containerName)
                 .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
-                    .withName(kafkaServiceName)
-                    .withHostName(cluster.getClusterName() + "-" + kafkaServiceName));
+                    .withName(containerName)
+                    .withHostName(cluster.getClusterName() + "-" + containerName));
     }
 
     @Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index f5854e7..cc96c57 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -43,7 +43,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 @Slf4j
 public class KafkaSourceTester extends SourceTester<KafkaContainer> {
 
-    private static final String NAME = "kafka";
+    private static final String SOURCE_TYPE = "kafka";
 
     private final String kafkaTopicName;
 
@@ -51,12 +51,12 @@ public class KafkaSourceTester extends SourceTester<KafkaContainer> {
 
     private KafkaConsumer<String, String> kafkaConsumer;
 
-    public KafkaSourceTester() {
-        super(NAME);
+    public KafkaSourceTester(String containerName) {
+        super(SOURCE_TYPE);
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
         this.kafkaTopicName = "kafka_source_topic_" + suffix;
 
-        sourceConfig.put("bootstrapServers", NAME + ":9092");
+        sourceConfig.put("bootstrapServers", containerName + ":9092");
         sourceConfig.put("groupId", "test-source-group");
         sourceConfig.put("fetchMinBytes", 1L);
         sourceConfig.put("autoCommitIntervalMs", 10L);