You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/03/27 15:51:31 UTC
[kafka] branch trunk updated: MINOR: optimize integration test
shutdown (#8366)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f5ad46 MINOR: optimize integration test shutdown (#8366)
3f5ad46 is described below
commit 3f5ad4640bd7cf373fddcbbfe5ac5596d95c82c0
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 27 10:50:43 2020 -0500
MINOR: optimize integration test shutdown (#8366)
* delete topics before tearing down multi-node clusters to avoid leader elections during shutdown
* tear down all nodes concurrently instead of sequentially
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../integration/utils/EmbeddedKafkaCluster.java | 21 ++++++++++++++++++++-
.../streams/integration/utils/KafkaEmbedded.java | 11 ++++++-----
2 files changed, 26 insertions(+), 6 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 4d53058..edabe24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -21,6 +21,7 @@ import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.test.TestCondition;
@@ -39,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
/**
* Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers.
@@ -118,8 +120,25 @@ public class EmbeddedKafkaCluster extends ExternalResource {
* Stop the Kafka cluster.
*/
private void stop() {
+ if (brokers.length > 1) {
+ // delete the topics first to avoid cascading leader elections while shutting down the brokers
+ final Set<String> topics = getAllTopicsInCluster();
+ if (!topics.isEmpty()) {
+ try (final Admin adminClient = brokers[0].createAdminClient()) {
+ adminClient.deleteTopics(topics).all().get();
+ } catch (final InterruptedException e) {
+ log.warn("Got interrupted while deleting topics in preparation for stopping embedded brokers", e);
+ throw new RuntimeException(e);
+ } catch (final ExecutionException | RuntimeException e) {
+ log.warn("Couldn't delete all topics before stopping brokers", e);
+ }
+ }
+ }
+ for (final KafkaEmbedded broker : brokers) {
+ broker.stopAsync();
+ }
for (final KafkaEmbedded broker : brokers) {
- broker.stop();
+ broker.awaitStoppedAndPurge();
}
zookeeper.shutdown();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 43438a5..78b1ee1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -123,14 +123,15 @@ public class KafkaEmbedded {
return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
}
- /**
- * Stop the broker.
- */
@SuppressWarnings("WeakerAccess")
- public void stop() {
+ public void stopAsync() {
log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...",
- brokerList(), zookeeperConnect());
+ brokerList(), zookeeperConnect());
kafka.shutdown();
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public void awaitStoppedAndPurge() {
kafka.awaitShutdown();
log.debug("Removing log dir at {} ...", logDir);
try {