You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/04 04:03:44 UTC

[flink-table-store] branch master updated: [FLINK-28335] Delete topic after tests

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c24d3d24 [FLINK-28335] Delete topic after tests
c24d3d24 is described below

commit c24d3d244b4ae00e599924fa8814498f98f74aab
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Mon Jul 4 12:03:39 2022 +0800

    [FLINK-28335] Delete topic after tests
    
    This closes #188
---
 .../org/apache/flink/table/store/kafka/KafkaTableTestBase.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index db928329..1284fcff 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -53,6 +53,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 /** Base class for Kafka Table IT Cases. */
@@ -110,9 +111,11 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
     }
 
     @After
-    public void after() {
+    public void after() throws ExecutionException, InterruptedException {
         // Cancel timer for debug logging
         cancelTimeoutLogger();
+        // Delete topics for avoid reusing topics of Kafka cluster
+        deleteTopics();
     }
 
     public Properties getStandardProps() {
@@ -169,6 +172,11 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
         }
     }
 
+    private void deleteTopics() throws ExecutionException, InterruptedException {
+        final AdminClient adminClient = AdminClient.create(getStandardProps());
+        adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
+    }
+
     // ------------------------ For Debug Logging Purpose ----------------------------------
 
     private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {