You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/18 21:50:56 UTC
[incubator-pulsar] branch master updated: [tests] improve connector
related integration tests (#2587)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7530d64 [tests] improve connector related integration tests (#2587)
7530d64 is described below
commit 7530d64a679a0783122b18058c1148c89c0fee0a
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Sep 18 14:50:51 2018 -0700
[tests] improve connector related integration tests (#2587)
*Motivation*
with more and more connector are added, it becomes expensive to start all external services at the begin.
*Changes*
- refactor the connector testing framework to start external service before methods
- fix kafka, cassandra and mysql connectors
---
distribution/io/src/assemble/io.xml | 6 +
.../broker/service/PersistentQueueE2ETest.java | 60 ----------
.../functions/instance/JavaInstanceRunnable.java | 3 +-
.../pulsar/functions/source/TopicSchema.java | 3 +
...rchAbstractSink.java => ElasticSearchSink.java} | 9 +-
.../io/elasticsearch/ElasticSearchStringSink.java | 35 ------
.../resources/META-INF/services/pulsar-io.yaml | 4 +-
.../io/elasticsearch/ElasticSearchSinkTests.java | 4 +-
.../apache/pulsar/io/kafka/KafkaAbstractSink.java | 6 +-
.../{KafkaStringSink.java => KafkaBytesSink.java} | 25 ++++-
.../resources/META-INF/services/pulsar-io.yaml | 2 +-
site2/docs/io-quickstart.md | 2 +-
.../version-2.1.0-incubating/io-quickstart.md | 2 +-
.../integration/functions/PulsarFunctionsTest.java | 39 +++++--
.../functions/utils/CommandGenerator.java | 2 +-
.../integration/io/CassandraSinkArchiveTester.java | 121 ---------------------
.../tests/integration/io/CassandraSinkTester.java | 45 +++++---
.../integration/io/ElasticSearchSinkTester.java | 34 +++---
.../tests/integration/io/HdfsSinkTester.java | 22 ++--
.../tests/integration/io/JdbcSinkTester.java | 38 ++++---
.../tests/integration/io/KafkaSinkTester.java | 28 +++--
.../tests/integration/io/KafkaSourceTester.java | 12 +-
.../pulsar/tests/integration/io/SinkTester.java | 25 ++++-
.../pulsar/tests/integration/io/SourceTester.java | 4 +-
.../tests/integration/suites/PulsarTestSuite.java | 50 ---------
.../integration/topologies/PulsarCluster.java | 17 +++
26 files changed, 212 insertions(+), 386 deletions(-)
diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 08ff859..a509e19 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -92,5 +92,11 @@
<outputDirectory>connectors</outputDirectory>
<fileMode>644</fileMode>
</file>
+
+ <file>
+ <source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source>
+ <outputDirectory>connectors</outputDirectory>
+ <fileMode>644</fileMode>
+ </file>
</files>
</assembly>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 2937ca0..fb7a76c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -233,66 +233,6 @@ public class PersistentQueueE2ETest extends BrokerTestBase {
deleteTopic(topicName);
}
- @Test
- public void testConsumersWithDifferentPermits() throws Exception {
- final String topicName = "persistent://prop/use/ns-abc/shared-topic4";
- final String subName = "sub4";
- final int numMsgs = 10000;
-
- final AtomicInteger msgCountConsumer1 = new AtomicInteger(0);
- final AtomicInteger msgCountConsumer2 = new AtomicInteger(0);
- final CountDownLatch latch = new CountDownLatch(numMsgs);
-
- int recvQ1 = 10;
- Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ1)
- .messageListener((consumer, msg) -> {
- msgCountConsumer1.incrementAndGet();
- try {
- consumer.acknowledge(msg);
- latch.countDown();
- } catch (PulsarClientException e) {
- fail("Should not fail");
- }
- }).subscribe();
-
- int recvQ2 = 1;
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
- .subscriptionType(SubscriptionType.Shared).receiverQueueSize(recvQ2)
- .messageListener((consumer, msg) -> {
- msgCountConsumer2.incrementAndGet();
- try {
- consumer.acknowledge(msg);
- latch.countDown();
- } catch (PulsarClientException e) {
- fail("Should not fail");
- }
- }).subscribe();
-
- List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs);
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
- .enableBatching(false)
- .maxPendingMessages(numMsgs + 1)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
- for (int i = 0; i < numMsgs; i++) {
- String message = "msg-" + i;
- futures.add(producer.sendAsync(message.getBytes()));
- }
- FutureUtil.waitForAll(futures).get();
- producer.close();
-
- latch.await(5, TimeUnit.SECONDS);
-
- assertEquals(msgCountConsumer1.get(), numMsgs - numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1);
- assertEquals(msgCountConsumer2.get(), numMsgs / (recvQ1 + recvQ2), numMsgs * 0.1);
-
- consumer1.close();
- consumer2.close();
-
- deleteTopic(topicName);
- }
-
// this test is good to have to see the distribution, but every now and then it gets slightly different than the
// expected numbers. keeping this disabled to not break the build, but nevertheless this gives good insight into
// how the round robin distribution algorithm is behaving
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4ba7340..b3f86ea 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -146,7 +146,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
ThreadContext.put("instance", instanceConfig.getInstanceId());
- log.info("Starting Java Instance {}", instanceConfig.getFunctionDetails().getName());
+ log.info("Starting Java Instance {} : \n Details = {}",
+ instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
// start the function thread
loadJars();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 76375dc..db9e880 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -78,6 +78,9 @@ public class TopicSchema {
private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
if (GenericRecord.class.isAssignableFrom(clazz)) {
return SchemaType.AUTO;
+ } else if (byte[].class.equals(clazz)) {
+ // if function uses bytes, we should ignore
+ return SchemaType.NONE;
} else {
Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join();
if (schema.isPresent()) {
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
similarity index 94%
rename from pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
rename to pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 3760d40..86546f3 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAbstractSink.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -51,7 +51,7 @@ import org.elasticsearch.common.xcontent.XContentType;
* Users need to implement extractKeyValue function to use this sink.
* This class assumes that the input will be JSON documents
*/
-public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> {
+public class ElasticSearchSink implements Sink<byte[]> {
protected static final String DOCUMENT = "doc";
@@ -74,7 +74,7 @@ public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> {
@Override
public void write(Record<byte[]> record) {
- KeyValue<K, V> keyValue = extractKeyValue(record);
+ KeyValue<String, byte[]> keyValue = extractKeyValue(record);
IndexRequest indexRequest = Requests.indexRequest(elasticSearchConfig.getIndexName());
indexRequest.type(DOCUMENT);
indexRequest.source(keyValue.getValue(), XContentType.JSON);
@@ -91,7 +91,10 @@ public abstract class ElasticSearchAbstractSink<K, V> implements Sink<byte[]> {
}
}
- public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
+ public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
+ String key = record.getKey().orElseGet(null);
+ return new KeyValue<>(key, record.getValue());
+ }
private void createIndexIfNeeded() throws IOException {
GetIndexRequest request = new GetIndexRequest();
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
deleted file mode 100644
index 6cfa03d..0000000
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchStringSink.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * Concrete ElasticSearch sink.
- * This class assumes that the input will be JSON documents
- */
-public class ElasticSearchStringSink extends ElasticSearchAbstractSink<String, String> {
-
- @Override
- public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
- String key = record.getKey().orElseGet(() -> new String(record.getValue()));
- return new KeyValue<>(key, new String(record.getValue()));
- }
-}
diff --git a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
index 0307516..97789e9 100644
--- a/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/elastic-search/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
# under the License.
#
-name: Elastic Search
+name: elastic_search
description: Writes data into Elastic Search
-sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchStringSink
+sinkClass: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index ea2b886..f188829 100644
--- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -56,7 +56,7 @@ public class ElasticSearchSinkTests {
@Mock
protected SinkContext mockSinkContext;
protected Map<String, Object> map;
- protected ElasticSearchStringSink sink;
+ protected ElasticSearchSink sink;
@BeforeClass
public static final void init() {
@@ -71,7 +71,7 @@ public class ElasticSearchSinkTests {
public final void setUp() throws Exception {
map = new HashMap<String, Object> ();
map.put("elasticSearchUrl", "http://localhost:9200");
- sink = new ElasticSearchStringSink();
+ sink = new ElasticSearchSink();
mockRecord = mock(Record.class);
mockSinkContext = mock(SinkContext.class);
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index a92a368..50ce4b9 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -70,6 +70,10 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
}
}
+ protected Properties beforeCreateProducer(Properties props) {
+ return props;
+ }
+
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
kafkaSinkConfig = KafkaSinkConfig.load(config);
@@ -89,7 +93,7 @@ public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getKeySerializerClass());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaSinkConfig.getValueSerializerClass());
- producer = new KafkaProducer<>(props);
+ producer = new KafkaProducer<>(beforeCreateProducer(props));
log.info("Kafka sink started.");
}
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
similarity index 50%
rename from pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
rename to pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
index 89e3e7f..9ce2bdc 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
@@ -19,16 +19,31 @@
package org.apache.pulsar.io.kafka;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
/**
- * Kafka sink that treats incoming messages on the input topic as Strings
- * and write identical key/value pairs.
+ * Kafka sink should treats incoming messages as pure bytes. So we don't
+ * apply schema into it.
*/
-public class KafkaStringSink extends KafkaAbstractSink<String, String> {
+@Slf4j
+public class KafkaBytesSink extends KafkaAbstractSink<String, byte[]> {
+
+ @Override
+ protected Properties beforeCreateProducer(Properties props) {
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ log.info("Created kafka producer config : {}", props);
+ return props;
+ }
+
@Override
- public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
- return new KeyValue<>(record.getKey().orElse(null), new String(record.getValue()));
+ public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
+ return new KeyValue<>(record.getKey().orElse(null), record.getValue());
}
}
\ No newline at end of file
diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index a7bc813..7afc154 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,4 +20,4 @@
name: kafka
description: Kafka source and sink connector
sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource
-sinkClass: org.apache.pulsar.io.kafka.KafkaStringSink
+sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink
diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md
index 8b8cfd3..4a48bc0 100644
--- a/site2/docs/io-quickstart.md
+++ b/site2/docs/io-quickstart.md
@@ -123,7 +123,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
Example output:
```json
-[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connect [...]
+[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connecto [...]
```
If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`,
diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
index 7b21379..afa8e31 100644
--- a/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
+++ b/site2/website/versioned_docs/version-2.1.0-incubating/io-quickstart.md
@@ -124,7 +124,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors
Example output:
```json
-[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connect [...]
+[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaBytesSink"},{"name":"kinesis","description":"Kinesis sink connecto [...]
```
If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 5f11eef..e9ea618 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.tests.integration.io.*;
import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.Test;
/**
@@ -62,17 +63,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
@Test
public void testKafkaSink() throws Exception {
- testSink(new KafkaSinkTester(), true);
+ testSink(new KafkaSinkTester(), true, new KafkaSourceTester());
}
@Test
public void testCassandraSink() throws Exception {
- testSink(new CassandraSinkTester(), true);
+ testSink(CassandraSinkTester.createTester(true), true);
}
@Test
public void testCassandraArchiveSink() throws Exception {
- testSink(new CassandraSinkArchiveTester(), false);
+ testSink(CassandraSinkTester.createTester(false), false);
}
@Test(enabled = false)
@@ -91,8 +92,31 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
private void testSink(SinkTester tester, boolean builtin) throws Exception {
- tester.findSinkServiceContainer(pulsarCluster.getExternalServices());
+ tester.startServiceContainer(pulsarCluster);
+ try {
+ runSinkTester(tester, builtin);
+ } finally {
+ tester.stopServiceContainer(pulsarCluster);
+ }
+ }
+
+ private <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester,
+ boolean builtinSink,
+ SourceTester<ServiceContainerT> sourceTester)
+ throws Exception {
+ ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);
+ try {
+ runSinkTester(sinkTester, builtinSink);
+ if (null != sourceTester) {
+ sourceTester.setServiceContainer(serviceContainer);
+ testSource(sourceTester);
+ }
+ } finally {
+ sinkTester.stopServiceContainer(pulsarCluster);
+ }
+ }
+ private void runSinkTester(SinkTester tester, boolean builtin) throws Exception {
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String inputTopicName = "test-sink-connector-"
@@ -357,14 +381,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// Source Test
//
- @Test
- public void testKafkaSource() throws Exception {
- testSource(new KafkaSourceTester());
- }
-
private void testSource(SourceTester tester) throws Exception {
- tester.findSourceServiceContainer(pulsarCluster.getExternalServices());
-
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "test-source-connector-"
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 6578d08..6f4d012 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -161,7 +161,7 @@ public class CommandGenerator {
}
public String generateUpdateFunctionCommand(String codeFile) {
- StringBuilder commandBuilder = new StringBuilder("PULSAR_MEM=-Xmx1024m ");
+ StringBuilder commandBuilder = new StringBuilder();
if (adminUrl == null) {
commandBuilder.append("/pulsar/bin/pulsar-admin functions update");
} else {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
deleted file mode 100644
index 86c7689..0000000
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkArchiveTester.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
-
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * A tester for testing cassandra sink submitted as an archive.
- */
-@Slf4j
-public class CassandraSinkArchiveTester extends SinkTester {
-
- private static final String NAME = "cassandra";
-
- private static final String ROOTS = "cassandra";
- private static final String KEY = "key";
- private static final String COLUMN = "col";
-
- private final String keySpace;
- private final String tableName;
-
- private CassandraContainer cassandraCluster;
-
- private Cluster cluster;
- private Session session;
-
- public CassandraSinkArchiveTester() {
- super("/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink");
-
- String suffix = randomName(8) + "_" + System.currentTimeMillis();
- this.keySpace = "keySpace_" + suffix;
- this.tableName = "tableName_" + suffix;
-
- sinkConfig.put("roots", ROOTS);
- sinkConfig.put("keyspace", keySpace);
- sinkConfig.put("columnFamily", tableName);
- sinkConfig.put("keyname", KEY);
- sinkConfig.put("columnName", COLUMN);
- }
-
- @Override
- public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
- GenericContainer<?> container = containers.get(NAME);
- checkState(container instanceof CassandraContainer,
- "No kafka service found in the cluster");
-
- this.cassandraCluster = (CassandraContainer) container;
- }
-
- @Override
- public void prepareSink() {
- // build the sink
- cluster = Cluster.builder()
- .addContactPoint("localhost")
- .withPort(cassandraCluster.getCassandraPort())
- .build();
-
- // connect to the cluster
- session = cluster.connect();
- log.info("Connecting to cassandra cluster at localhost:{}", cassandraCluster.getCassandraPort());
-
- String createKeySpace =
- "CREATE KEYSPACE " + keySpace
- + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; ";
- log.info(createKeySpace);
- session.execute(createKeySpace);
- session.execute("USE " + keySpace);
-
- String createTable = "CREATE TABLE " + tableName
- + "(" + KEY + " text PRIMARY KEY, "
- + COLUMN + " text);";
- log.info(createTable);
- session.execute(createTable);
- }
-
- @Override
- public void validateSinkResult(Map<String, String> kvs) {
- String query = "SELECT * FROM " + tableName + ";";
- ResultSet result = session.execute(query);
- List<Row> rows = result.all();
- assertEquals(kvs.size(), rows.size());
- for (Row row : rows) {
- String key = row.getString(KEY);
- String value = row.getString(COLUMN);
-
- String expectedValue = kvs.get(key);
- assertNotNull(expectedValue);
- assertEquals(expectedValue, value);
- }
- }
-}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
index c9d3e5a..3309358 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/CassandraSinkTester.java
@@ -24,12 +24,11 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import java.util.List;
import java.util.Map;
-import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -38,7 +37,15 @@ import static org.junit.Assert.assertNotNull;
* A tester for testing cassandra sink.
*/
@Slf4j
-public class CassandraSinkTester extends SinkTester {
+public class CassandraSinkTester extends SinkTester<CassandraContainer> {
+
+ public static CassandraSinkTester createTester(boolean builtin) {
+ if (builtin) {
+ return new CassandraSinkTester(builtin);
+ } else {
+ return new CassandraSinkTester();
+ }
+ }
private static final String NAME = "cassandra";
@@ -49,13 +56,11 @@ public class CassandraSinkTester extends SinkTester {
private final String keySpace;
private final String tableName;
- private CassandraContainer cassandraCluster;
-
private Cluster cluster;
private Session session;
- public CassandraSinkTester() {
- super(SinkType.CASSANDRA);
+ private CassandraSinkTester() {
+ super(NAME, "/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink");
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.keySpace = "keySpace_" + suffix;
@@ -68,13 +73,23 @@ public class CassandraSinkTester extends SinkTester {
sinkConfig.put("columnName", COLUMN);
}
- @Override
- public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
- GenericContainer<?> container = containers.get(NAME);
- checkState(container instanceof CassandraContainer,
- "No kafka service found in the cluster");
+ private CassandraSinkTester(boolean builtin) {
+ super(NAME, SinkType.CASSANDRA);
+
+ String suffix = randomName(8) + "_" + System.currentTimeMillis();
+ this.keySpace = "keySpace_" + suffix;
+ this.tableName = "tableName_" + suffix;
- this.cassandraCluster = (CassandraContainer) container;
+ sinkConfig.put("roots", ROOTS);
+ sinkConfig.put("keyspace", keySpace);
+ sinkConfig.put("columnFamily", tableName);
+ sinkConfig.put("keyname", KEY);
+ sinkConfig.put("columnName", COLUMN);
+ }
+
+ @Override
+ protected CassandraContainer createSinkService(PulsarCluster cluster) {
+ return new CassandraContainer(cluster.getClusterName());
}
@Override
@@ -82,12 +97,12 @@ public class CassandraSinkTester extends SinkTester {
// build the sink
cluster = Cluster.builder()
.addContactPoint("localhost")
- .withPort(cassandraCluster.getCassandraPort())
+ .withPort(serviceContainer.getCassandraPort())
.build();
// connect to the cluster
session = cluster.connect();
- log.info("Connecting to cassandra cluster at localhost:{}", cassandraCluster.getCassandraPort());
+ log.info("Connecting to cassandra cluster at localhost:{}", serviceContainer.getCassandraPort());
String createKeySpace =
"CREATE KEYSPACE " + keySpace
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
index 0effc8e..eee208e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/ElasticSearchSinkTester.java
@@ -18,57 +18,57 @@
*/
package org.apache.pulsar.tests.integration.io;
-import static com.google.common.base.Preconditions.checkState;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import java.util.Map;
-import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
-import org.testcontainers.containers.GenericContainer;
-public class ElasticSearchSinkTester extends SinkTester {
-
+public class ElasticSearchSinkTester extends SinkTester<ElasticSearchContainer> {
+
private RestHighLevelClient elasticClient;
public ElasticSearchSinkTester() {
- super(SinkType.ELASTIC_SEARCH);
+ super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH);
- sinkConfig.put("elasticSearchUrl", "http://localhost:9200");
+ sinkConfig.put("elasticSearchUrl", "http://" + ElasticSearchContainer.NAME + ":9200");
sinkConfig.put("indexName", "test-index");
}
+
@Override
- public void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices) {
- GenericContainer<?> container = externalServices.get(ElasticSearchContainer.NAME);
- checkState(container instanceof ElasticSearchContainer,
- "No ElasticSearch service found in the cluster");
+ protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
+ return new ElasticSearchContainer(cluster.getClusterName());
}
@Override
public void prepareSink() throws Exception {
- RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
+ RestClientBuilder builder = RestClient.builder(
+ new HttpHost(
+ "localhost",
+ serviceContainer.getMappedPort(9200),
+ "http"));
elasticClient = new RestHighLevelClient(builder);
}
@Override
public void validateSinkResult(Map<String, String> kvs) {
-
SearchRequest searchRequest = new SearchRequest("test-index");
searchRequest.types("doc");
try {
- Header headers = null;
- SearchResponse searchResult = elasticClient.search(searchRequest, headers);
- assertTrue(searchResult.getHits().getTotalHits() > 0);
+ SearchResponse searchResult = elasticClient.search(searchRequest);
+ assertTrue(searchResult.getHits().getTotalHits() > 0, searchResult.toString());
} catch (Exception e) {
- e.printStackTrace();
+ fail("Encountered exception on validating elastic search results", e);
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
index 46c5f24..957b93a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
@@ -21,18 +21,14 @@ package org.apache.pulsar.tests.integration.io;
import java.util.Map;
import org.apache.pulsar.tests.integration.containers.HdfsContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import static com.google.common.base.Preconditions.checkState;
-
-public class HdfsSinkTester extends SinkTester {
+public class HdfsSinkTester extends SinkTester<HdfsContainer> {
private static final String NAME = "HDFS";
- private HdfsContainer hdfsCluster;
-
public HdfsSinkTester() {
- super(SinkType.HDFS);
+ super(NAME, SinkType.HDFS);
// TODO How do I get the core-site.xml, and hdfs-site.xml files from the container?
sinkConfig.put("hdfsConfigResources", "");
@@ -40,20 +36,18 @@ public class HdfsSinkTester extends SinkTester {
}
@Override
- public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
- GenericContainer<?> container = containers.get(NAME);
- checkState(container instanceof HdfsContainer, "No HDFS service found in the cluster");
- this.hdfsCluster = (HdfsContainer) container;
+ protected HdfsContainer createSinkService(PulsarCluster cluster) {
+ return new HdfsContainer(cluster.getClusterName());
}
@Override
public void prepareSink() throws Exception {
// Create the test directory
- hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing");
- hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing");
+ serviceContainer.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing");
+ serviceContainer.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing");
// Execute all future commands as the "tester" user
- hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester");
+ serviceContainer.execInContainer("export HADOOP_USER_NAME=tester");
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 7c14ba9..72d9b01 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -18,22 +18,23 @@
*/
package org.apache.pulsar.tests.integration.io;
-import static com.google.common.base.Preconditions.checkState;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
+import com.github.dockerjava.api.command.CreateContainerCmd;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Map;
+import java.util.function.Consumer;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.testcontainers.containers.GenericContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.MySQLContainer;
/**
@@ -41,7 +42,7 @@ import org.testcontainers.containers.MySQLContainer;
* This will use MySql as DB server
*/
@Slf4j
-public class JdbcSinkTester extends SinkTester {
+public class JdbcSinkTester extends SinkTester<MySQLContainer> {
/**
* A Simple class to test jdbc class,
@@ -57,14 +58,14 @@ public class JdbcSinkTester extends SinkTester {
}
private static final String NAME = "jdbc";
+ private static final String MYSQL = "mysql";
- private MySQLContainer mySQLContainer;
private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
private String tableName = "test";
private Connection connection;
public JdbcSinkTester() {
- super(SinkType.JDBC);
+ super(NAME, SinkType.JDBC);
// container default value is test
sinkConfig.put("userName", "test");
@@ -79,21 +80,28 @@ public class JdbcSinkTester extends SinkTester {
}
@Override
- public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
- GenericContainer<?> container = containers.get("mysql");
- checkState(container instanceof MySQLContainer,
- "No MySQL service found in the cluster");
-
- this.mySQLContainer = (MySQLContainer) container;
- log.info("find sink service container: {}", mySQLContainer.getContainerName());
+ protected MySQLContainer createSinkService(PulsarCluster cluster) {
+ return (MySQLContainer) new MySQLContainer()
+ .withUsername("test")
+ .withPassword("test")
+ .withDatabaseName("test")
+ .withNetworkAliases(MYSQL)
+ .withCreateContainerCmdModifier(new Consumer<CreateContainerCmd>() {
+ @Override
+ public void accept(CreateContainerCmd createContainerCmd) {
+ createContainerCmd
+ .withName(MYSQL)
+ .withHostName(cluster.getClusterName() + "-" + MYSQL);
+ }
+ });
}
@Override
public void prepareSink() throws Exception {
- String jdbcUrl = mySQLContainer.getJdbcUrl();
+ String jdbcUrl = serviceContainer.getJdbcUrl();
// we need set mysql server address in cluster network.
- sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test");
- String driver = mySQLContainer.getDriverClassName();
+ sinkConfig.put("jdbcUrl", "jdbc:mysql://" + MYSQL + ":3306/test");
+ String driver = serviceContainer.getDriverClassName();
Class.forName(driver);
connection = DriverManager.getConnection(jdbcUrl, "test", "test");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
index ff79e1a..6713cc1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSinkTester.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.tests.integration.io;
-import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -32,8 +31,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -41,18 +40,15 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
* A tester for testing kafka sink.
*/
@Slf4j
-public class KafkaSinkTester extends SinkTester {
+public class KafkaSinkTester extends SinkTester<KafkaContainer> {
private static final String NAME = "kafka";
private final String kafkaTopicName;
-
- private KafkaContainer kafkaContainer;
-
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaSinkTester() {
- super(SinkType.KAFKA);
+ super(NAME, SinkType.KAFKA);
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.kafkaTopicName = "kafka_sink_topic_" + suffix;
@@ -64,17 +60,19 @@ public class KafkaSinkTester extends SinkTester {
}
@Override
- public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
- GenericContainer<?> container = containers.get(NAME);
- checkState(container instanceof KafkaContainer,
- "No kafka service found in the cluster");
-
- this.kafkaContainer = (KafkaContainer) container;
+ protected KafkaContainer createSinkService(PulsarCluster cluster) {
+ final String kafkaServiceName = NAME;
+ return new KafkaContainer()
+ .withEmbeddedZookeeper()
+ .withNetworkAliases(kafkaServiceName)
+ .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
+ .withName(kafkaServiceName)
+ .withHostName(cluster.getClusterName() + "-" + kafkaServiceName));
}
@Override
public void prepareSink() throws Exception {
- ExecResult execResult = kafkaContainer.execInContainer(
+ ExecResult execResult = serviceContainer.execInContainer(
"/usr/bin/kafka-topics",
"--create",
"--zookeeper",
@@ -91,7 +89,7 @@ public class KafkaSinkTester extends SinkTester {
kafkaConsumer = new KafkaConsumer<>(
ImmutableMap.of(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(),
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
),
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
index 4928f00..cee17b1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/KafkaSourceTester.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.tests.integration.io;
-import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.testng.Assert.assertTrue;
@@ -35,7 +34,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.testcontainers.containers.Container.ExecResult;
-import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -43,7 +41,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
* A tester for testing kafka source.
*/
@Slf4j
-public class KafkaSourceTester extends SourceTester {
+public class KafkaSourceTester extends SourceTester<KafkaContainer> {
private static final String NAME = "kafka";
@@ -68,12 +66,8 @@ public class KafkaSourceTester extends SourceTester {
}
@Override
- public void findSourceServiceContainer(Map<String, GenericContainer<?>> containers) {
- GenericContainer<?> container = containers.get(NAME);
- checkState(container instanceof KafkaContainer,
- "No kafka service found in the cluster");
-
- this.kafkaContainer = (KafkaContainer) container;
+ public void setServiceContainer(KafkaContainer container) {
+ this.kafkaContainer = container;
}
@Override
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index d2917e6..2dd0759 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.io;
import java.util.Map;
import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.GenericContainer;
import org.testng.collections.Maps;
@@ -28,7 +29,7 @@ import org.testng.collections.Maps;
* A tester used for testing a specific sink.
*/
@Getter
-public abstract class SinkTester {
+public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
public enum SinkType {
UNDEFINED,
@@ -39,19 +40,23 @@ public abstract class SinkTester {
ELASTIC_SEARCH
}
+ protected final String networkAlias;
protected final SinkType sinkType;
protected final String sinkArchive;
protected final String sinkClassName;
protected final Map<String, Object> sinkConfig;
+ protected ServiceContainerT serviceContainer;
- public SinkTester(SinkType sinkType) {
+ public SinkTester(String networkAlias, SinkType sinkType) {
+ this.networkAlias = networkAlias;
this.sinkType = sinkType;
this.sinkArchive = null;
this.sinkClassName = null;
this.sinkConfig = Maps.newHashMap();
}
- public SinkTester(String sinkArchive, String sinkClassName) {
+ public SinkTester(String networkAlias, String sinkArchive, String sinkClassName) {
+ this.networkAlias = networkAlias;
this.sinkType = SinkType.UNDEFINED;
this.sinkArchive = sinkArchive;
this.sinkClassName = sinkClassName;
@@ -62,7 +67,19 @@ public abstract class SinkTester {
return Schema.STRING;
}
- public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices);
+ protected abstract ServiceContainerT createSinkService(PulsarCluster cluster);
+
+ public ServiceContainerT startServiceContainer(PulsarCluster cluster) {
+ this.serviceContainer = createSinkService(cluster);
+ cluster.startService(networkAlias, serviceContainer);
+ return serviceContainer;
+ }
+
+ public void stopServiceContainer(PulsarCluster cluster) {
+ if (null != serviceContainer) {
+ cluster.stopService(networkAlias, serviceContainer);
+ }
+ }
public SinkType sinkType() {
return sinkType;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
index dc58f2f..f1feb70 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SourceTester.java
@@ -27,7 +27,7 @@ import org.testng.collections.Maps;
* A tester used for testing a specific source.
*/
@Getter
-public abstract class SourceTester {
+public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
protected final String sourceType;
protected final Map<String, Object> sourceConfig;
@@ -37,7 +37,7 @@ public abstract class SourceTester {
this.sourceConfig = Maps.newHashMap();
}
- public abstract void findSourceServiceContainer(Map<String, GenericContainer<?>> externalServices);
+ public abstract void setServiceContainer(ServiceContainerT serviceContainer);
public String sourceType() {
return sourceType;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index b4a6b83..20b9da0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -18,19 +18,10 @@
*/
package org.apache.pulsar.tests.integration.suites;
-import java.util.Map;
-import org.apache.pulsar.tests.integration.containers.CassandraContainer;
-import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
-import org.apache.pulsar.tests.integration.containers.HdfsContainer;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.containers.MySQLContainer;
import org.testng.ITest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
-import org.testng.collections.Maps;
public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
@@ -47,47 +38,6 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
}
@Override
- protected PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpecBuilder specBuilder) {
- PulsarClusterSpecBuilder builder = super.beforeSetupCluster(clusterName, specBuilder);
-
- // start functions
-
- // register external services
- Map<String, GenericContainer<?>> externalServices = Maps.newHashMap();
-
- final String kafkaServiceName = "kafka";
- externalServices.put(
- kafkaServiceName,
- new KafkaContainer()
- .withEmbeddedZookeeper()
- .withNetworkAliases(kafkaServiceName)
- .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
- .withName(kafkaServiceName)
- .withHostName(clusterName + "-" + kafkaServiceName)));
-
- final String cassandraServiceName = "cassandra";
- externalServices.put(
- cassandraServiceName,
- new CassandraContainer(clusterName));
-
- // use mySQL for jdbc test
- final String jdbcServiceName = "mysql";
- externalServices.put(
- jdbcServiceName,
- new MySQLContainer()
- .withExposedPorts(3306));
-
- externalServices.put(
- ElasticSearchContainer.NAME,
- new ElasticSearchContainer(ElasticSearchContainer.NAME)
- .withExposedPorts(9200));
-
- builder = builder.externalServices(externalServices);
-
- return builder;
- }
-
- @Override
public String getTestName() {
return "pulsar-test-suite";
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index f78097b..af60712 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -209,6 +209,23 @@ public class PulsarCluster {
}
}
+ public void startService(String networkAlias,
+ GenericContainer<?> serviceContainer) {
+ log.info("Starting external service {} ...", networkAlias);
+ serviceContainer.withNetwork(network);
+ serviceContainer.withNetworkAliases(networkAlias);
+ serviceContainer.start();
+ log.info("Successfully start external service {}", networkAlias);
+ }
+
+ public void stopService(String networkAlias,
+ GenericContainer<?> serviceContainer) {
+ log.info("Stopping external service {} ...", networkAlias);
+ serviceContainer.stop();
+ log.info("Successfully stop external service {}", networkAlias);
+ }
+
+
private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName,
int numContainers,
Function<String, T> containerCreator) {