You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/10 15:05:48 UTC
flink git commit: [FLINK-4439] Validate 'bootstrap.servers' config in
flink kafka consumer 0.8
Repository: flink
Updated Branches:
refs/heads/master abc1657ba -> 1836e08f0
[FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8
This closes #2397
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1836e08f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1836e08f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1836e08f
Branch: refs/heads/master
Commit: 1836e08f0f8bb17ed50e59b42d02999436b36f6c
Parents: abc1657
Author: George <ne...@yahoo.com>
Authored: Wed Oct 5 11:48:02 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Oct 10 17:05:11 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer08.java | 37 +++++++++++-
.../connectors/kafka/KafkaConsumer08Test.java | 59 ++++++++++++++++++--
2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index d7a6364..0aacccd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -42,7 +42,10 @@ import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
+import java.net.InetAddress;
import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -237,7 +240,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-
+
checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
String[] seedBrokers = seedBrokersConfString.split(",");
List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
@@ -290,6 +293,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
break retryLoop; // leave the loop through the brokers
} catch (Exception e) {
+ //validates seed brokers in case of a ClosedChannelException
+ validateSeedBrokers(seedBrokers, e);
LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
"" + e.getClass() + ". Message: " + e.getMessage());
LOG.debug("Detailed trace", e);
@@ -348,6 +353,36 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
}
+ /**
+ * Validate that at least one seed broker is valid in case of a
+ * ClosedChannelException.
+ *
+ * @param seedBrokers
+ * array containing the seed brokers e.g. ["host1:port1",
+ * "host2:port2"]
+ * @param exception
+ * instance
+ */
+ private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
+ if (!(exception instanceof ClosedChannelException)) {
+ return;
+ }
+ int unknownHosts = 0;
+ for (String broker : seedBrokers) {
+ URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
+ try {
+ InetAddress.getByName(brokerUrl.getHost());
+ } catch (UnknownHostException e) {
+ unknownHosts++;
+ }
+ }
+ // throw meaningful exception if all the provided hosts are invalid
+ if (unknownHosts == seedBrokers.length) {
+ throw new IllegalArgumentException("All the servers provided in: '"
+ + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
+ }
+ }
+
private static long getInvalidOffsetBehavior(Properties config) {
final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
if (val.equals("none")) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index f0b58cf..9520f55 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -18,17 +18,17 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.Properties;
-import static org.junit.Assert.*;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Test;
public class KafkaConsumer08Test {
@@ -89,4 +89,51 @@ public class KafkaConsumer08Test {
assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
}
}
+
+ @Test
+ public void testAllBoostrapServerHostsAreInvalid() {
+ try {
+ String zookeeperConnect = "localhost:56794";
+ String bootstrapServers = "indexistentHost:11111";
+ String groupId = "non-existent-group";
+ Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+ new SimpleStringSchema(), props);
+ consumer.open(new Configuration());
+ fail();
+ } catch (Exception e) {
+ assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
+ e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ + "' config are invalid"));
+ }
+ }
+
+ @Test
+ public void testAtLeastOneBootstrapServerHostIsValid() {
+ try {
+ String zookeeperConnect = "localhost:56794";
+ // we declare one valid boostrap server, namely the one with
+ // 'localhost'
+ String bootstrapServers = "indexistentHost:11111, localhost:22222";
+ String groupId = "non-existent-group";
+ Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+ FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+ new SimpleStringSchema(), props);
+ consumer.open(new Configuration());
+ fail();
+ } catch (Exception e) {
+ // test is not failing because we have one valid boostrap server
+ assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!",
+ !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+ + " config are invalid"));
+ }
+ }
+
+ private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.connect", zookeeperConnect);
+ props.setProperty("bootstrap.servers", bootstrapServers);
+ props.setProperty("group.id", groupId);
+ return props;
+ }
}