You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 04:43:55 UTC
[pulsar] branch master updated: [Tests] Fix resource leaks in Pulsar IO integration tests (#15866)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4fe26b98d [Tests] Fix resource leaks in Pulsar IO integration tests (#15866)
0a4fe26b98d is described below
commit 0a4fe26b98d8a0bc7e8a6dd6373c5e1710b7ebe9
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Jun 1 07:43:49 2022 +0300
[Tests] Fix resource leaks in Pulsar IO integration tests (#15866)
* [Tests] Fix resource leaks in Pulsar IO integration tests
---
.../tests/integration/io/PulsarIOTestBase.java | 48 +++++++++++--------
.../tests/integration/io/RabbitMQSinkTester.java | 5 ++
.../tests/integration/io/RabbitMQSourceTester.java | 5 ++
.../integration/io/sinks/CassandraSinkTester.java | 12 +++++
.../io/sinks/ElasticSearchSinkTester.java | 7 +++
.../tests/integration/io/sinks/HdfsSinkTester.java | 4 ++
.../io/sinks/JdbcPostgresSinkTester.java | 8 ++++
.../integration/io/sinks/KafkaSinkTester.java | 56 ++++++++++++----------
.../integration/io/sinks/KinesisSinkTester.java | 8 ++++
.../integration/io/sinks/OpenSearchSinkTester.java | 9 +++-
.../tests/integration/io/sinks/SinkTester.java | 4 +-
.../integration/io/sources/KafkaSourceTester.java | 8 ++++
.../tests/integration/io/sources/SourceTester.java | 2 +-
.../debezium/DebeziumMongoDbSourceTester.java | 11 +++--
.../debezium/DebeziumMsSqlSourceTester.java | 11 +++--
.../debezium/DebeziumMySqlSourceTester.java | 9 ++--
.../debezium/DebeziumOracleDbSourceTester.java | 11 +++--
.../debezium/DebeziumPostgreSqlSourceTester.java | 13 ++---
.../debezium/PulsarIODebeziumSourceRunner.java | 18 +++++--
19 files changed, 170 insertions(+), 79 deletions(-)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
index cd4e3ba9421..d02bc7444f5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
@@ -27,34 +27,44 @@ import org.testcontainers.containers.GenericContainer;
public abstract class PulsarIOTestBase extends PulsarFunctionsTestBase {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected void testSink(SinkTester tester, boolean builtin) throws Exception {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected void testSink(SinkTester tester, boolean builtin) throws Exception {
tester.startServiceContainer(pulsarCluster);
try {
- PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
+ PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
runner.runSinkTester(tester, builtin);
} finally {
- tester.stopServiceContainer(pulsarCluster);
+ try {
+ tester.close();
+ } finally {
+ tester.stopServiceContainer(pulsarCluster);
+ }
}
}
- @SuppressWarnings("rawtypes")
- protected <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester,
- boolean builtinSink,
- SourceTester<ServiceContainerT> sourceTester) throws Exception {
+ @SuppressWarnings("rawtypes")
+ protected <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester,
+ boolean builtinSink,
+ SourceTester<ServiceContainerT> sourceTester)
+ throws Exception {
- ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);
+ ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);
- try {
- PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
+ try {
+ PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
runner.runSinkTester(sinkTester, builtinSink);
- if (null != sourceTester) {
- PulsarIOSourceRunner sourceRunner = new PulsarIOSourceRunner(pulsarCluster, functionRuntimeType.toString());
- sourceTester.setServiceContainer(serviceContainer);
- sourceRunner.testSource(sourceTester);
- }
- } finally {
- sinkTester.stopServiceContainer(pulsarCluster);
- }
+ if (null != sourceTester) {
+ PulsarIOSourceRunner sourceRunner =
+ new PulsarIOSourceRunner(pulsarCluster, functionRuntimeType.toString());
+ sourceTester.setServiceContainer(serviceContainer);
+ sourceRunner.testSource(sourceTester);
+ }
+ } finally {
+ try {
+ sinkTester.close();
+ } finally {
+ sinkTester.stopServiceContainer(pulsarCluster);
+ }
+ }
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java
index 3b698dc22d1..f186360b950 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java
@@ -106,4 +106,9 @@ public class RabbitMQSinkTester extends SinkTester<RabbitMQContainer> {
private final String key;
private final byte[] body;
}
+
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java
index 12d1bbabb9f..a4508fd590f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java
@@ -89,4 +89,9 @@ public class RabbitMQSourceTester extends SourceTester<RabbitMQContainer> {
return values;
}
}
+
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
index 97f492bc762..d545ddfd08f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
@@ -133,4 +133,16 @@ public class CassandraSinkTester extends SinkTester<CassandraContainer> {
assertEquals(expectedValue, value);
}
}
+
+ @Override
+ public void close() throws Exception {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (cluster != null) {
+ cluster.close();
+ cluster = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 07c5c6e9b12..65822817b08 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -176,4 +176,11 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
}
}
+ @Override
+ public void close() throws Exception {
+ if (elasticClient != null) {
+ elasticClient._transport().close();
+ elasticClient = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java
index 670200cd0b6..b6320a58e2e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java
@@ -56,4 +56,8 @@ public class HdfsSinkTester extends SinkTester<HdfsContainer> {
}
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
index e67626580b7..80e04c485ea 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
@@ -191,4 +191,12 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
public boolean isKeyValueSchema() {
return keyValueSchema;
}
+
+ @Override
+ public void close() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
index ab66d3dead0..0fbf5cdcdb6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.tests.integration.io.sinks;
import static org.apache.pulsar.tests.integration.topologies.PulsarTestBase.randomName;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
@@ -32,7 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.tests.integration.io.sinks.SinkTester.SinkType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.KafkaContainer;
@@ -49,7 +47,7 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
private final String containerName;
- public KafkaSinkTester(String containerName) {
+ public KafkaSinkTester(String containerName) {
super(containerName, SinkType.KAFKA);
this.containerName = containerName;
String suffix = randomName(8) + "_" + System.currentTimeMillis();
@@ -68,35 +66,35 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
.withEmbeddedZookeeper()
.withNetworkAliases(containerName)
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
- .withName(containerName)
- .withHostName(cluster.getClusterName() + "-" + containerName));
+ .withName(containerName)
+ .withHostName(cluster.getClusterName() + "-" + containerName));
}
@Override
public void prepareSink() throws Exception {
ExecResult execResult = serviceContainer.execInContainer(
- "/usr/bin/kafka-topics",
- "--create",
- "--zookeeper",
- "localhost:2181",
- "--partitions",
- "1",
- "--replication-factor",
- "1",
- "--topic",
- kafkaTopicName);
+ "/usr/bin/kafka-topics",
+ "--create",
+ "--zookeeper",
+ "localhost:2181",
+ "--partitions",
+ "1",
+ "--replication-factor",
+ "1",
+ "--topic",
+ kafkaTopicName);
assertTrue(
- execResult.getStdout().contains("Created topic"),
- execResult.getStdout());
+ execResult.getStdout().contains("Created topic"),
+ execResult.getStdout());
kafkaConsumer = new KafkaConsumer<>(
- ImmutableMap.of(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(),
- ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
- ),
- new StringDeserializer(),
- new StringDeserializer()
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(),
+ ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+ ),
+ new StringDeserializer(),
+ new StringDeserializer()
);
kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
@@ -108,7 +106,7 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
while (kvIter.hasNext()) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1L));
log.info("Received {} records from kafka topic {}",
- records.count(), kafkaTopicName);
+ records.count(), kafkaTopicName);
if (records.isEmpty()) {
continue;
}
@@ -122,4 +120,12 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
}
}
}
+
+ @Override
+ public void close() throws Exception {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ kafkaConsumer = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index e0cb2f15eee..07aa1675750 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -270,4 +270,12 @@ public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
private Set<Long> set1;
private Map<String, String> map1;
}
+
+ @Override
+ public void close() throws Exception {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 3c2efa4c06b..dadf6d295a9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -73,5 +73,12 @@ public class OpenSearchSinkTester extends ElasticSearchSinkTester {
});
}
-
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (elasticClient != null) {
+ elasticClient.close();
+ elasticClient = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java
index fcf4b08fb90..ddb5bf0c04e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java
@@ -34,7 +34,7 @@ import org.testng.collections.Maps;
* A tester used for testing a specific sink.
*/
@Getter
-public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
+public abstract class SinkTester<ServiceContainerT extends GenericContainer> implements AutoCloseable {
@Getter
public enum SinkType {
@@ -127,6 +127,4 @@ public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
.send();
}
}
-
-
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
index 238fe7000b4..b7ec88909fc 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
@@ -142,4 +142,12 @@ public class KafkaSourceTester extends SourceTester<KafkaContainer> {
log.info("Successfully produced {} messages to kafka topic {}", numMessages, kafkaTopicName);
return kvs;
}
+
+ @Override
+ public void close() throws Exception {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ kafkaConsumer = null;
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index 97613cb23db..0e4e1786ba0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -39,7 +39,7 @@ import org.testng.collections.Maps;
*/
@Getter
@Slf4j
-public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
+public abstract class SourceTester<ServiceContainerT extends GenericContainer> implements AutoCloseable {
public static final String INSERT = "INSERT";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 110ff11c00d..1dbefe10773 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sources.debezium;
+import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
@@ -25,11 +26,8 @@ import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import java.io.Closeable;
-import java.util.Map;
-
@Slf4j
-public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbContainer> implements Closeable {
+public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbContainer> {
private static final String NAME = "debezium-mongodb";
@@ -118,7 +116,10 @@ public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbCon
@Override
public void close() {
if (pulsarCluster != null) {
- pulsarCluster.stopService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+ if (debeziumMongoDbContainer != null) {
+ pulsarCluster.stopService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+ debeziumMongoDbContainer = null;
+ }
}
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index 5894d4f0ecb..d13e10e66fc 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sources.debezium;
+import java.util.Map;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -30,14 +31,11 @@ import org.testcontainers.shaded.com.google.common.base.Preconditions;
import org.testng.Assert;
import org.testng.util.Strings;
-import java.io.Closeable;
-import java.util.Map;
-
/**
* A tester for testing Debezium Microsoft SQl Server source.
*/
@Slf4j
-public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContainer> implements Closeable {
+public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContainer> {
private static final String NAME = "debezium-mssql";
@@ -158,7 +156,10 @@ public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContain
@Override
public void close() {
if (pulsarCluster != null) {
- PulsarCluster.stopService(DebeziumMsSqlContainer.NAME, debeziumMsSqlContainer);
+ if (debeziumMsSqlContainer != null) {
+ PulsarCluster.stopService(DebeziumMsSqlContainer.NAME, debeziumMsSqlContainer);
+ debeziumMsSqlContainer = null;
+ }
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 7958fa01992..b56f2dea7f2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sources.debezium;
-import java.io.Closeable;
import java.util.Map;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -38,7 +36,7 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
* which is a MySQL database server preconfigured with an inventory database.
*/
@Slf4j
-public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> implements Closeable {
+public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> {
private static final String NAME = "debezium-mysql";
@@ -128,7 +126,10 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
@Override
public void close() {
if (pulsarCluster != null) {
- pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
+ if (debeziumMySqlContainer != null) {
+ pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
+ debeziumMySqlContainer = null;
+ }
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 40078d67365..115d3eba8ff 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.tests.integration.io.sources.debezium;
+import java.util.Map;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -29,14 +30,11 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.shaded.com.google.common.base.Preconditions;
import org.testng.util.Strings;
-import java.io.Closeable;
-import java.util.Map;
-
/**
* A tester for testing Debezium OracleDb source.
*/
@Slf4j
-public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbContainer> implements Closeable {
+public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbContainer> {
private static final String NAME = "debezium-oracle";
private static final long SLEEP_AFTER_COMMAND_MS = 30_000;
@@ -247,7 +245,10 @@ public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbC
@Override
public void close() {
if (pulsarCluster != null) {
- PulsarCluster.stopService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+ if (debeziumOracleDbContainer != null) {
+ PulsarCluster.stopService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+ debeziumOracleDbContainer = null;
+ }
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
index 318c57fab63..1f0a7fe3a59 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.io.sources.debezium;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
@@ -27,10 +29,6 @@ import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.Assert;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* A tester for testing Debezium Postgresql source.
*
@@ -41,7 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
* which is a Postgresql database server preconfigured with an inventory database.
*/
@Slf4j
-public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgreSqlContainer> implements Closeable {
+public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgreSqlContainer> {
private static final String NAME = "debezium-postgres";
@@ -155,7 +153,10 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre
@Override
public void close() {
if (pulsarCluster != null) {
- PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
+ if (debeziumPostgresqlContainer != null) {
+ PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
+ debeziumPostgresqlContainer = null;
+ }
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
index da1e7597ea6..bb2797b89ab 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.tests.integration.io.sources.debezium;
import com.google.common.base.Preconditions;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -28,10 +31,6 @@ import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.GenericContainer;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.failsafe.Failsafe;
-
@Slf4j
public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {
@@ -63,7 +62,16 @@ public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T extends GenericContainer> void testSource(SourceTester<T> sourceTester) throws Exception {
- // prepare the testing environment for source
+ try {
+ internalTestSource(sourceTester);
+ } finally {
+ sourceTester.close();
+ }
+ }
+
+ private <T extends GenericContainer> void internalTestSource
+ (SourceTester<T> sourceTester) throws Exception {
+ // prepare the testing environment for source
prepareSource(sourceTester);
// submit the source connector