You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/05/20 01:16:48 UTC
[flink] branch master updated: [FLINK-12030][connector/kafka] Check
the topic existence after topic creation using KafkaConsumer.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 51a0d42 [FLINK-12030][connector/kafka] Check the topic existence after topic creation using KafkaConsumer.
51a0d42 is described below
commit 51a0d42ade8ee3789036ac1ee7c121133b58212a
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Tue May 19 23:44:17 2020 +0800
[FLINK-12030][connector/kafka] Check the topic existence after topic creation using KafkaConsumer.
---
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++++++++++------------
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++++++++++------------
2 files changed, 38 insertions(+), 48 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 322c3aa..9ae751b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -322,32 +324,25 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// validate that the topic has been created
final long deadline = System.nanoTime() + 30_000_000_000L;
- do {
- try {
- if (config.isSecureMode()) {
- //increase wait time since in Travis ZK timeout occurs frequently
- int wait = zkTimeout / 100;
- LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
- Thread.sleep(wait);
- } else {
- Thread.sleep(100);
+ boolean topicCreated = false;
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerConnectionString());
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.putAll(getSecureProperties());
+ try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ do {
+ topicCreated = !consumer.partitionsFor(topic).isEmpty();
+ if (!topicCreated) {
+ Thread.sleep(10);
}
- } catch (InterruptedException e) {
- // restore interrupted state
- }
- // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
- // not always correct.
-
- // create a new ZK utils connection
- ZkUtils checkZKConn = getZkUtils();
- if (AdminUtils.topicExists(checkZKConn, topic)) {
- checkZKConn.close();
- return;
- }
- checkZKConn.close();
+ } while (!topicCreated && System.nanoTime() < deadline);
+ } catch (InterruptedException e) {
+ // do nothing.
+ }
+ if (!topicCreated) {
+ fail("Test topic could not be created");
}
- while (System.nanoTime() < deadline);
- fail("Test topic could not be created");
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 478ce38..c846f16 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,32 +159,25 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
// validate that the topic has been created
final long deadline = System.nanoTime() + 30_000_000_000L;
- do {
- try {
- if (config.isSecureMode()) {
- //increase wait time since in Travis ZK timeout occurs frequently
- int wait = zkTimeout / 100;
- LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
- Thread.sleep(wait);
- } else {
- Thread.sleep(100);
+ boolean topicCreated = false;
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerConnectionString());
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.putAll(getSecureProperties());
+ try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ do {
+ topicCreated = !consumer.partitionsFor(topic).isEmpty();
+ if (!topicCreated) {
+ Thread.sleep(10);
}
- } catch (InterruptedException e) {
- // restore interrupted state
- }
- // we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
- // not always correct.
-
- // create a new ZK utils connection
- ZkUtils checkZKConn = getZkUtils();
- if (AdminUtils.topicExists(checkZKConn, topic)) {
- checkZKConn.close();
- return;
- }
- checkZKConn.close();
+ } while (!topicCreated && System.nanoTime() < deadline);
+ } catch (InterruptedException e) {
+ // do nothing.
+ }
+ if (!topicCreated) {
+ fail("Test topic could not be created");
}
- while (System.nanoTime() < deadline);
- fail("Test topic could not be created");
}
@Override