You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/05/20 01:16:48 UTC

[flink] branch master updated: [FLINK-12030][connector/kafka] Check the topic existence after topic creation using KafkaConsumer.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 51a0d42  [FLINK-12030][connector/kafka] Check the topic existence after topic creation using KafkaConsumer.
51a0d42 is described below

commit 51a0d42ade8ee3789036ac1ee7c121133b58212a
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Tue May 19 23:44:17 2020 +0800

    [FLINK-12030][connector/kafka] Check the topic existence after topic creation using KafkaConsumer.
---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++++++++++------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++++++++++------------
 2 files changed, 38 insertions(+), 48 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 322c3aa..9ae751b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -322,32 +324,25 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 		// validate that the topic has been created
 		final long deadline = System.nanoTime() + 30_000_000_000L;
-		do {
-			try {
-				if (config.isSecureMode()) {
-					//increase wait time since in Travis ZK timeout occurs frequently
-					int wait = zkTimeout / 100;
-					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
-					Thread.sleep(wait);
-				} else {
-					Thread.sleep(100);
+		boolean topicCreated = false;
+		Properties props = new Properties();
+		props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerConnectionString());
+		props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+		props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+		props.putAll(getSecureProperties());
+		try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+			do {
+				topicCreated = !consumer.partitionsFor(topic).isEmpty();
+				if (!topicCreated) {
+					Thread.sleep(10);
 				}
-			} catch (InterruptedException e) {
-				// restore interrupted state
-			}
-			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
-			// not always correct.
-
-			// create a new ZK utils connection
-			ZkUtils checkZKConn = getZkUtils();
-			if (AdminUtils.topicExists(checkZKConn, topic)) {
-				checkZKConn.close();
-				return;
-			}
-			checkZKConn.close();
+			} while (!topicCreated && System.nanoTime() < deadline);
+		} catch (InterruptedException e) {
+			// do nothing.
+		}
+		if (!topicCreated) {
+			fail("Test topic could not be created");
 		}
-		while (System.nanoTime() < deadline);
-		fail("Test topic could not be created");
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 478ce38..c846f16 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,32 +159,25 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 		// validate that the topic has been created
 		final long deadline = System.nanoTime() + 30_000_000_000L;
-		do {
-			try {
-				if (config.isSecureMode()) {
-					//increase wait time since in Travis ZK timeout occurs frequently
-					int wait = zkTimeout / 100;
-					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
-					Thread.sleep(wait);
-				} else {
-					Thread.sleep(100);
+		boolean topicCreated = false;
+		Properties props = new Properties();
+		props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerConnectionString());
+		props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+		props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+		props.putAll(getSecureProperties());
+		try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+			do {
+				topicCreated = !consumer.partitionsFor(topic).isEmpty();
+				if (!topicCreated) {
+					Thread.sleep(10);
 				}
-			} catch (InterruptedException e) {
-				// restore interrupted state
-			}
-			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
-			// not always correct.
-
-			// create a new ZK utils connection
-			ZkUtils checkZKConn = getZkUtils();
-			if (AdminUtils.topicExists(checkZKConn, topic)) {
-				checkZKConn.close();
-				return;
-			}
-			checkZKConn.close();
+			} while (!topicCreated && System.nanoTime() < deadline);
+		} catch (InterruptedException e) {
+			// do nothing.
+		}
+		if (!topicCreated) {
+			fail("Test topic could not be created");
 		}
-		while (System.nanoTime() < deadline);
-		fail("Test topic could not be created");
 	}
 
 	@Override