You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/03/27 15:51:31 UTC

[kafka] branch trunk updated: MINOR: optimize integration test shutdown (#8366)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f5ad46  MINOR: optimize integration test shutdown (#8366)
3f5ad46 is described below

commit 3f5ad4640bd7cf373fddcbbfe5ac5596d95c82c0
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 27 10:50:43 2020 -0500

    MINOR: optimize integration test shutdown (#8366)
    
    * delete topics before tearing down multi-node clusters to avoid leader elections during shutdown
    * tear down all nodes concurrently instead of sequentially
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../integration/utils/EmbeddedKafkaCluster.java     | 21 ++++++++++++++++++++-
 .../streams/integration/utils/KafkaEmbedded.java    | 11 ++++++-----
 2 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 4d53058..edabe24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -21,6 +21,7 @@ import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.test.TestCondition;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers.
@@ -118,8 +120,25 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * Stop the Kafka cluster.
      */
     private void stop() {
+        if (brokers.length > 1) {
+            // delete the topics first to avoid cascading leader elections while shutting down the brokers
+            final Set<String> topics = getAllTopicsInCluster();
+            if (!topics.isEmpty()) {
+                try (final Admin adminClient = brokers[0].createAdminClient()) {
+                    adminClient.deleteTopics(topics).all().get();
+                } catch (final InterruptedException e) {
+                    log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", e);
+                    throw new RuntimeException(e);
+                } catch (final ExecutionException | RuntimeException e) {
+                    log.warn("Couldn't delete all topics before stopping brokers", e);
+                }
+            }
+        }
+        for (final KafkaEmbedded broker : brokers) {
+            broker.stopAsync();
+        }
         for (final KafkaEmbedded broker : brokers) {
-            broker.stop();
+            broker.awaitStoppedAndPurge();
         }
         zookeeper.shutdown();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 43438a5..78b1ee1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -123,14 +123,15 @@ public class KafkaEmbedded {
         return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
     }
 
-    /**
-     * Stop the broker.
-     */
     @SuppressWarnings("WeakerAccess")
-    public void stop() {
+    public void stopAsync() {
         log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
-            brokerList(), zookeeperConnect());
+                  brokerList(), zookeeperConnect());
         kafka.shutdown();
+    }
+
+    @SuppressWarnings("WeakerAccess")
+    public void awaitStoppedAndPurge() {
         kafka.awaitShutdown();
         log.debug("Removing log dir at {} ...", logDir);
         try {