You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/19 18:55:55 UTC

[kafka] branch 2.5 updated: KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (#8673)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new b85bae4  KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (#8673)
b85bae4 is described below

commit b85bae4b53cc3a6ce55d3687f4b0fe4bc5a5569f
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Tue May 19 20:38:35 2020 +0200

    KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (#8673)
    
    Fixes EmbeddedKafkaCluster.deleteTopicAndWait for use with kafka_2.13
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, John Roesler <vv...@apache.org>
---
 .../streams/integration/utils/EmbeddedKafkaCluster.java | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index f30eced..69c533a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -27,7 +27,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -273,7 +272,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
      */
     public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException {
-        final Set<String> topics = JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
+        final Set<String> topics = getAllTopicsInCluster();
         for (final String topic : topics) {
             try {
                 brokers[0].deleteTopic(topic);
@@ -312,8 +311,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         @Override
         public boolean conditionMet() {
-            final Set<String> allTopics = new HashSet<>(
-                    JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
+            final Set<String> allTopics = getAllTopicsInCluster();
             return !allTopics.removeAll(deletedTopics);
         }
     }
@@ -327,7 +325,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         @Override
         public boolean conditionMet() {
-            final Set<String> allTopics = JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
+            final Set<String> allTopics = getAllTopicsInCluster();
             return allTopics.equals(remainingTopics);
         }
     }
@@ -339,4 +337,13 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
         return servers;
     }
+
+    public Set<String> getAllTopicsInCluster() {
+        final scala.collection.Iterator<String> topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster().iterator();
+        final Set<String> topics = new HashSet<>();
+        while (topicsIterator.hasNext()) {
+            topics.add(topicsIterator.next());
+        }
+        return topics;
+    }
 }