You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/19 18:55:55 UTC
[kafka] branch 2.5 updated: KAFKA-9992: Eliminate JavaConverters in
EmbeddedKafkaCluster (#8673)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new b85bae4 KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (#8673)
b85bae4 is described below
commit b85bae4b53cc3a6ce55d3687f4b0fe4bc5a5569f
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Tue May 19 20:38:35 2020 +0200
KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (#8673)
Fixes EmbeddedKafkaCluster.deleteTopicAndWait for use with kafka_2.13
Reviewers: Boyang Chen <bo...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, John Roesler <vv...@apache.org>
---
.../streams/integration/utils/EmbeddedKafkaCluster.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index f30eced..69c533a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -27,7 +27,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
import java.io.IOException;
import java.util.ArrayList;
@@ -273,7 +272,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
*/
public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
- final Set<String> topics = JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
+ final Set<String> topics = getAllTopicsInCluster();
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
@@ -312,8 +311,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@Override
public boolean conditionMet() {
- final Set<String> allTopics = new HashSet<>(
- JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
+ final Set<String> allTopics = getAllTopicsInCluster();
return !allTopics.removeAll(deletedTopics);
}
}
@@ -327,7 +325,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
@Override
public boolean conditionMet() {
- final Set<String> allTopics = JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
+ final Set<String> allTopics = getAllTopicsInCluster();
return allTopics.equals(remainingTopics);
}
}
@@ -339,4 +337,13 @@ public class EmbeddedKafkaCluster extends ExternalResource {
}
return servers;
}
+
+ public Set<String> getAllTopicsInCluster() {
+ final scala.collection.Iterator<String> topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster().iterator();
+ final Set<String> topics = new HashSet<>();
+ while (topicsIterator.hasNext()) {
+ topics.add(topicsIterator.next());
+ }
+ return topics;
+ }
}