You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/01 04:43:55 UTC

[pulsar] branch master updated: [Tests] Fix resource leaks in Pulsar IO integration tests (#15866)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4fe26b98d [Tests] Fix resource leaks in Pulsar IO integration tests (#15866)
0a4fe26b98d is described below

commit 0a4fe26b98d8a0bc7e8a6dd6373c5e1710b7ebe9
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Jun 1 07:43:49 2022 +0300

    [Tests] Fix resource leaks in Pulsar IO integration tests (#15866)
    
    * [Tests] Fix resource leaks in Pulsar IO integration tests
---
 .../tests/integration/io/PulsarIOTestBase.java     | 48 +++++++++++--------
 .../tests/integration/io/RabbitMQSinkTester.java   |  5 ++
 .../tests/integration/io/RabbitMQSourceTester.java |  5 ++
 .../integration/io/sinks/CassandraSinkTester.java  | 12 +++++
 .../io/sinks/ElasticSearchSinkTester.java          |  7 +++
 .../tests/integration/io/sinks/HdfsSinkTester.java |  4 ++
 .../io/sinks/JdbcPostgresSinkTester.java           |  8 ++++
 .../integration/io/sinks/KafkaSinkTester.java      | 56 ++++++++++++----------
 .../integration/io/sinks/KinesisSinkTester.java    |  8 ++++
 .../integration/io/sinks/OpenSearchSinkTester.java |  9 +++-
 .../tests/integration/io/sinks/SinkTester.java     |  4 +-
 .../integration/io/sources/KafkaSourceTester.java  |  8 ++++
 .../tests/integration/io/sources/SourceTester.java |  2 +-
 .../debezium/DebeziumMongoDbSourceTester.java      | 11 +++--
 .../debezium/DebeziumMsSqlSourceTester.java        | 11 +++--
 .../debezium/DebeziumMySqlSourceTester.java        |  9 ++--
 .../debezium/DebeziumOracleDbSourceTester.java     | 11 +++--
 .../debezium/DebeziumPostgreSqlSourceTester.java   | 13 ++---
 .../debezium/PulsarIODebeziumSourceRunner.java     | 18 +++++--
 19 files changed, 170 insertions(+), 79 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
index cd4e3ba9421..d02bc7444f5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
@@ -27,34 +27,44 @@ import org.testcontainers.containers.GenericContainer;
 
 public abstract class PulsarIOTestBase extends PulsarFunctionsTestBase {
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	protected void testSink(SinkTester tester, boolean builtin) throws Exception {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void testSink(SinkTester tester, boolean builtin) throws Exception {
         tester.startServiceContainer(pulsarCluster);
         try {
-        	PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
+            PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
             runner.runSinkTester(tester, builtin);
         } finally {
-            tester.stopServiceContainer(pulsarCluster);
+            try {
+                tester.close();
+            } finally {
+                tester.stopServiceContainer(pulsarCluster);
+            }
         }
     }
 
-	@SuppressWarnings("rawtypes")
-	protected <ServiceContainerT extends GenericContainer>  void testSink(SinkTester<ServiceContainerT> sinkTester,
-			boolean builtinSink,
-			SourceTester<ServiceContainerT> sourceTester) throws Exception {
+    @SuppressWarnings("rawtypes")
+    protected <ServiceContainerT extends GenericContainer> void testSink(SinkTester<ServiceContainerT> sinkTester,
+                                                                         boolean builtinSink,
+                                                                         SourceTester<ServiceContainerT> sourceTester)
+            throws Exception {
 
-		ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);
+        ServiceContainerT serviceContainer = sinkTester.startServiceContainer(pulsarCluster);
 
-		try {
-			PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
+        try {
+            PulsarIOSinkRunner runner = new PulsarIOSinkRunner(pulsarCluster, functionRuntimeType.toString());
             runner.runSinkTester(sinkTester, builtinSink);
-			if (null != sourceTester) {
-				PulsarIOSourceRunner sourceRunner = new PulsarIOSourceRunner(pulsarCluster, functionRuntimeType.toString());
-				sourceTester.setServiceContainer(serviceContainer);
-				sourceRunner.testSource(sourceTester);
-			}
-		} finally {
-			sinkTester.stopServiceContainer(pulsarCluster);
-		}
+            if (null != sourceTester) {
+                PulsarIOSourceRunner sourceRunner =
+                        new PulsarIOSourceRunner(pulsarCluster, functionRuntimeType.toString());
+                sourceTester.setServiceContainer(serviceContainer);
+                sourceRunner.testSource(sourceTester);
+            }
+        } finally {
+            try {
+                sinkTester.close();
+            } finally {
+                sinkTester.stopServiceContainer(pulsarCluster);
+            }
+        }
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java
index 3b698dc22d1..f186360b950 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSinkTester.java
@@ -106,4 +106,9 @@ public class RabbitMQSinkTester extends SinkTester<RabbitMQContainer> {
         private final String key;
         private final byte[] body;
     }
+
+    @Override
+    public void close() throws Exception {
+
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java
index 12d1bbabb9f..a4508fd590f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/RabbitMQSourceTester.java
@@ -89,4 +89,9 @@ public class RabbitMQSourceTester extends SourceTester<RabbitMQContainer> {
             return values;
         }
     }
+
+    @Override
+    public void close() throws Exception {
+
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
index 97f492bc762..d545ddfd08f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/CassandraSinkTester.java
@@ -133,4 +133,16 @@ public class CassandraSinkTester extends SinkTester<CassandraContainer> {
             assertEquals(expectedValue, value);
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        if (session != null) {
+            session.close();
+            session = null;
+        }
+        if (cluster != null) {
+            cluster.close();
+            cluster = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 07c5c6e9b12..65822817b08 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -176,4 +176,11 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
         }
     }
 
+    @Override
+    public void close() throws Exception {
+        if (elasticClient != null) {
+            elasticClient._transport().close();
+            elasticClient = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java
index 670200cd0b6..b6320a58e2e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/HdfsSinkTester.java
@@ -56,4 +56,8 @@ public class HdfsSinkTester extends SinkTester<HdfsContainer> {
 
 	}
 
+	@Override
+	public void close() throws Exception {
+
+	}
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
index e67626580b7..80e04c485ea 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/JdbcPostgresSinkTester.java
@@ -191,4 +191,12 @@ public class JdbcPostgresSinkTester extends SinkTester<PostgreSQLContainer> {
     public boolean isKeyValueSchema() {
         return keyValueSchema;
     }
+
+    @Override
+    public void close() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
index ab66d3dead0..0fbf5cdcdb6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KafkaSinkTester.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.tests.integration.io.sinks;
 import static org.apache.pulsar.tests.integration.topologies.PulsarTestBase.randomName;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -32,7 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pulsar.tests.integration.io.sinks.SinkTester.SinkType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.KafkaContainer;
@@ -49,7 +47,7 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
 
     private final String containerName;
 
-      public KafkaSinkTester(String containerName) {
+    public KafkaSinkTester(String containerName) {
         super(containerName, SinkType.KAFKA);
         this.containerName = containerName;
         String suffix = randomName(8) + "_" + System.currentTimeMillis();
@@ -68,35 +66,35 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
                 .withEmbeddedZookeeper()
                 .withNetworkAliases(containerName)
                 .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd
-                    .withName(containerName)
-                    .withHostName(cluster.getClusterName() + "-" + containerName));
+                        .withName(containerName)
+                        .withHostName(cluster.getClusterName() + "-" + containerName));
     }
 
     @Override
     public void prepareSink() throws Exception {
         ExecResult execResult = serviceContainer.execInContainer(
-            "/usr/bin/kafka-topics",
-            "--create",
-            "--zookeeper",
-            "localhost:2181",
-            "--partitions",
-            "1",
-            "--replication-factor",
-            "1",
-            "--topic",
-            kafkaTopicName);
+                "/usr/bin/kafka-topics",
+                "--create",
+                "--zookeeper",
+                "localhost:2181",
+                "--partitions",
+                "1",
+                "--replication-factor",
+                "1",
+                "--topic",
+                kafkaTopicName);
         assertTrue(
-            execResult.getStdout().contains("Created topic"),
-            execResult.getStdout());
+                execResult.getStdout().contains("Created topic"),
+                execResult.getStdout());
 
         kafkaConsumer = new KafkaConsumer<>(
-            ImmutableMap.of(
-                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(),
-                ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
-                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
-            ),
-            new StringDeserializer(),
-            new StringDeserializer()
+                ImmutableMap.of(
+                        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceContainer.getBootstrapServers(),
+                        ConsumerConfig.GROUP_ID_CONFIG, "sink-test-" + randomName(8),
+                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
+                ),
+                new StringDeserializer(),
+                new StringDeserializer()
         );
         kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
         log.info("Successfully subscribe to kafka topic {}", kafkaTopicName);
@@ -108,7 +106,7 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
         while (kvIter.hasNext()) {
             ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1L));
             log.info("Received {} records from kafka topic {}",
-                records.count(), kafkaTopicName);
+                    records.count(), kafkaTopicName);
             if (records.isEmpty()) {
                 continue;
             }
@@ -122,4 +120,12 @@ public class KafkaSinkTester extends SinkTester<KafkaContainer> {
             }
         }
     }
+
+    @Override
+    public void close() throws Exception {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+            kafkaConsumer = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index e0cb2f15eee..07aa1675750 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -270,4 +270,12 @@ public class KinesisSinkTester extends SinkTester<LocalStackContainer> {
         private Set<Long> set1;
         private Map<String, String> map1;
     }
+
+    @Override
+    public void close() throws Exception {
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 3c2efa4c06b..dadf6d295a9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -73,5 +73,12 @@ public class OpenSearchSinkTester extends ElasticSearchSinkTester {
         });
     }
 
-
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (elasticClient != null) {
+            elasticClient.close();
+            elasticClient = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java
index fcf4b08fb90..ddb5bf0c04e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/SinkTester.java
@@ -34,7 +34,7 @@ import org.testng.collections.Maps;
  * A tester used for testing a specific sink.
  */
 @Getter
-public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
+public abstract class SinkTester<ServiceContainerT extends GenericContainer> implements AutoCloseable {
 
     @Getter
     public enum SinkType {
@@ -127,6 +127,4 @@ public abstract class SinkTester<ServiceContainerT extends GenericContainer> {
                     .send();
         }
     }
-
-
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
index 238fe7000b4..b7ec88909fc 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/KafkaSourceTester.java
@@ -142,4 +142,12 @@ public class KafkaSourceTester extends SourceTester<KafkaContainer> {
         log.info("Successfully produced {} messages to kafka topic {}", numMessages, kafkaTopicName);
         return kvs;
     }
+
+    @Override
+    public void close() throws Exception {
+        if (kafkaConsumer != null) {
+            kafkaConsumer.close();
+            kafkaConsumer = null;
+        }
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index 97613cb23db..0e4e1786ba0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -39,7 +39,7 @@ import org.testng.collections.Maps;
  */
 @Getter
 @Slf4j
-public abstract class SourceTester<ServiceContainerT extends GenericContainer> {
+public abstract class SourceTester<ServiceContainerT extends GenericContainer> implements AutoCloseable {
 
     public static final String INSERT = "INSERT";
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 110ff11c00d..1dbefe10773 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io.sources.debezium;
 
+import java.util.Map;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
@@ -25,11 +26,8 @@ import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.io.sources.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
-import java.io.Closeable;
-import java.util.Map;
-
 @Slf4j
-public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbContainer> implements Closeable {
+public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbContainer> {
 
     private static final String NAME = "debezium-mongodb";
 
@@ -118,7 +116,10 @@ public class DebeziumMongoDbSourceTester extends SourceTester<DebeziumMongoDbCon
     @Override
     public void close() {
         if (pulsarCluster != null) {
-            pulsarCluster.stopService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+            if (debeziumMongoDbContainer != null) {
+                pulsarCluster.stopService(DebeziumMongoDbContainer.NAME, debeziumMongoDbContainer);
+                debeziumMongoDbContainer = null;
+            }
         }
     }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index 5894d4f0ecb..d13e10e66fc 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io.sources.debezium;
 
+import java.util.Map;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -30,14 +31,11 @@ import org.testcontainers.shaded.com.google.common.base.Preconditions;
 import org.testng.Assert;
 import org.testng.util.Strings;
 
-import java.io.Closeable;
-import java.util.Map;
-
 /**
  * A tester for testing Debezium Microsoft SQl Server source.
  */
 @Slf4j
-public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContainer> implements Closeable {
+public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContainer> {
 
     private static final String NAME = "debezium-mssql";
 
@@ -158,7 +156,10 @@ public class DebeziumMsSqlSourceTester extends SourceTester<DebeziumMsSqlContain
     @Override
     public void close() {
         if (pulsarCluster != null) {
-            PulsarCluster.stopService(DebeziumMsSqlContainer.NAME, debeziumMsSqlContainer);
+            if (debeziumMsSqlContainer != null) {
+                PulsarCluster.stopService(DebeziumMsSqlContainer.NAME, debeziumMsSqlContainer);
+                debeziumMsSqlContainer = null;
+            }
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 7958fa01992..b56f2dea7f2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io.sources.debezium;
 
-import java.io.Closeable;
 import java.util.Map;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -38,7 +36,7 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
  * which is a MySQL database server preconfigured with an inventory database.
  */
 @Slf4j
-public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> implements Closeable {
+public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContainer> {
 
     private static final String NAME = "debezium-mysql";
 
@@ -128,7 +126,10 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
     @Override
     public void close() {
         if (pulsarCluster != null) {
-            pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
+            if (debeziumMySqlContainer != null) {
+                pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer);
+                debeziumMySqlContainer = null;
+            }
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 40078d67365..115d3eba8ff 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io.sources.debezium;
 
+import java.util.Map;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -29,14 +30,11 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.shaded.com.google.common.base.Preconditions;
 import org.testng.util.Strings;
 
-import java.io.Closeable;
-import java.util.Map;
-
 /**
  * A tester for testing Debezium OracleDb source.
  */
 @Slf4j
-public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbContainer> implements Closeable {
+public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbContainer> {
 
     private static final String NAME = "debezium-oracle";
     private static final long SLEEP_AFTER_COMMAND_MS = 30_000;
@@ -247,7 +245,10 @@ public class DebeziumOracleDbSourceTester extends SourceTester<DebeziumOracleDbC
     @Override
     public void close() {
         if (pulsarCluster != null) {
-            PulsarCluster.stopService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+            if (debeziumOracleDbContainer != null) {
+                PulsarCluster.stopService(DebeziumOracleDbContainer.NAME, debeziumOracleDbContainer);
+                debeziumOracleDbContainer = null;
+            }
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
index 318c57fab63..1f0a7fe3a59 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.tests.integration.io.sources.debezium;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
@@ -27,10 +29,6 @@ import org.apache.pulsar.tests.integration.io.sources.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.Assert;
 
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * A tester for testing Debezium Postgresql source.
  *
@@ -41,7 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * which is a Postgresql database server preconfigured with an inventory database.
  */
 @Slf4j
-public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgreSqlContainer> implements Closeable {
+public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgreSqlContainer> {
 
     private static final String NAME = "debezium-postgres";
 
@@ -155,7 +153,10 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre
     @Override
     public void close() {
         if (pulsarCluster != null) {
-            PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
+            if (debeziumPostgresqlContainer != null) {
+                PulsarCluster.stopService(DebeziumPostgreSqlContainer.NAME, debeziumPostgresqlContainer);
+                debeziumPostgresqlContainer = null;
+            }
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
index da1e7597ea6..bb2797b89ab 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarIODebeziumSourceRunner.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.tests.integration.io.sources.debezium;
 
 import com.google.common.base.Preconditions;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.failsafe.Failsafe;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -28,10 +31,6 @@ import org.apache.pulsar.tests.integration.io.sources.SourceTester;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.GenericContainer;
 
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.failsafe.Failsafe;
-
 @Slf4j
 public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {
 
@@ -63,7 +62,16 @@ public class PulsarIODebeziumSourceRunner extends PulsarIOSourceRunner {
     @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public <T extends GenericContainer> void testSource(SourceTester<T> sourceTester)  throws Exception {
-           // prepare the testing environment for source
+        try {
+            internalTestSource(sourceTester);
+        } finally {
+            sourceTester.close();
+        }
+    }
+
+    private <T extends GenericContainer> void internalTestSource
+            (SourceTester<T> sourceTester) throws Exception {
+        // prepare the testing environment for source
         prepareSource(sourceTester);
 
         // submit the source connector