You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/30 03:13:07 UTC
[pulsar] branch master updated: Use randomized names for kafka
container in integration tests (#3412)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc3eb7f Use randomized names for kafka container in integration tests (#3412)
cc3eb7f is described below
commit cc3eb7f44b8ac884e83d9f0f127c216edd897d9d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 29 19:13:03 2019 -0800
Use randomized names for kafka container in integration tests (#3412)
* Use randomized names for kafka container in integration testss
* Revert change in KafkaSourceTester
* One more fix for source container names
* Use different name for different tests
---
.../integration/functions/PulsarFunctionsTest.java | 3 ++-
.../pulsar/tests/integration/io/KafkaSinkTester.java | 19 ++++++++++---------
.../tests/integration/io/KafkaSourceTester.java | 8 ++++----
3 files changed, 16 insertions(+), 14 deletions(-)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 23b7259..18eaba9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -73,7 +73,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
@Test
public void testKafkaSink() throws Exception {
- testSink(new KafkaSinkTester(), true, new KafkaSourceTester());
+ String kafkaContainerName = "kafka-" + randomName(8);
+ testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName));
}
@Test(enabled = false)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index 6713cc1..9077146 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.io;
import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
+import static org.apache.pulsar.tests.integration.topologies.PulsarTestBase.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -42,17 +43,18 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@Slf4j
public class KafkaSinkTester extends SinkTester<KafkaContainer> {
- private static final String NAME = "kafka";
-
private final String kafkaTopicName;
private KafkaConsumer<String, String> kafkaConsumer;
- public KafkaSinkTester() {
- super(NAME, SinkType.KAFKA);
+ private final String containerName;
+
+ public KafkaSinkTester(String containerName) {
+ super(containerName, SinkType.KAFKA);
+ this.containerName = containerName;
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_sink_topic_" + suffix;
- sinkConfig.put("bootstrapServers", NAME + ":9092");
+ sinkConfig.put("bootstrapServers", networkAlias + ":9092");
sinkConfig.put("acks", "all");
sinkConfig.put("batchSize", 1L);
sinkConfig.put("maxRequestSize", 1048576L);
@@ -61,13 +63,12 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
@Override
protected KafkaContainer createSinkService(PulsarCluster cluster) {
- final String kafkaServiceName = NAME;
return new KafkaContainer()
.withEmbeddedZookeeper()
- .withNetworkAliases(kafkaServiceName)
+ .withNetworkAliases(containerName)
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
- .withName(kafkaServiceName)
- .withHostName(cluster.getClusterName() + "-" + kafkaServiceName));
+ .withName(containerName)
+ .withHostName(cluster.getClusterName() + "-" + containerName));
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index f5854e7..cc96c57 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -43,7 +43,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@Slf4j
public class KafkaSourceTester extends SourceTester<KafkaContainer> {
- private static final String NAME = "kafka";
+ private static final String SOURCE_TYPE = "kafka";
private final String kafkaTopicName;
@@ -51,12 +51,12 @@ public class KafkaSourceTester extends SourceTester<KafkaContainer> {
private KafkaConsumer<String, String> kafkaConsumer;
- public KafkaSourceTester() {
- super(NAME);
+ public KafkaSourceTester(String containerName) {
+ super(SOURCE_TYPE);
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_source_topic_" + suffix;
- sourceConfig.put("bootstrapServers", NAME + ":9092");
+ sourceConfig.put("bootstrapServers", containerName + ":9092");
sourceConfig.put("groupId", "test-source-group");
sourceConfig.put("fetchMinBytes", 1L);
sourceConfig.put("autoCommitIntervalMs", 10L);