You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/04 04:03:44 UTC
[flink-table-store] branch master updated: [FLINK-28335] Delete topic after tests
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c24d3d24 [FLINK-28335] Delete topic after tests
c24d3d24 is described below
commit c24d3d244b4ae00e599924fa8814498f98f74aab
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Mon Jul 4 12:03:39 2022 +0800
[FLINK-28335] Delete topic after tests
This closes #188
---
.../org/apache/flink/table/store/kafka/KafkaTableTestBase.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
index db928329..1284fcff 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -53,6 +53,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/** Base class for Kafka Table IT Cases. */
@@ -110,9 +111,11 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
}
@After
- public void after() {
+ public void after() throws ExecutionException, InterruptedException {
// Cancel timer for debug logging
cancelTimeoutLogger();
+ // Delete topics for avoid reusing topics of Kafka cluster
+ deleteTopics();
}
public Properties getStandardProps() {
@@ -169,6 +172,11 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
}
}
+ private void deleteTopics() throws ExecutionException, InterruptedException {
+ final AdminClient adminClient = AdminClient.create(getStandardProps());
+ adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
+ }
+
// ------------------------ For Debug Logging Purpose ----------------------------------
private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {