You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/10 15:05:48 UTC

flink git commit: [FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8

Repository: flink
Updated Branches:
  refs/heads/master abc1657ba -> 1836e08f0


[FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8

This closes #2397


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1836e08f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1836e08f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1836e08f

Branch: refs/heads/master
Commit: 1836e08f0f8bb17ed50e59b42d02999436b36f6c
Parents: abc1657
Author: George <ne...@yahoo.com>
Authored: Wed Oct 5 11:48:02 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Oct 10 17:05:11 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java  | 37 +++++++++++-
 .../connectors/kafka/KafkaConsumer08Test.java   | 59 ++++++++++++++++++--
 2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index d7a6364..0aacccd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -42,7 +42,10 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
 
+import java.net.InetAddress;
 import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -237,7 +240,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 	public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) {
 		String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
 		final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
-
+		
 		checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
 		String[] seedBrokers = seedBrokersConfString.split(",");
 		List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
@@ -290,6 +293,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 					}
 					break retryLoop; // leave the loop through the brokers
 				} catch (Exception e) {
+					//validates seed brokers in case of a ClosedChannelException
+					validateSeedBrokers(seedBrokers, e);
 					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
 							"" + e.getClass() + ". Message: " + e.getMessage());
 					LOG.debug("Detailed trace", e);
@@ -348,6 +353,36 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 		}
 	}
 
+	/**
+	 * Validate that at least one seed broker is valid in case of a
+	 * ClosedChannelException.
+	 * 
+	 * @param seedBrokers
+	 *            array containing the seed brokers e.g. ["host1:port1",
+	 *            "host2:port2"]
+	 * @param exception
+	 *            instance
+	 */
+	private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
+		if (!(exception instanceof ClosedChannelException)) {
+			return;
+		}
+		int unknownHosts = 0;
+		for (String broker : seedBrokers) {
+			URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
+			try {
+				InetAddress.getByName(brokerUrl.getHost());
+			} catch (UnknownHostException e) {
+				unknownHosts++;
+			}
+		}
+		// throw meaningful exception if all the provided hosts are invalid
+		if (unknownHosts == seedBrokers.length) {
+			throw new IllegalArgumentException("All the servers provided in: '"
+					+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
+		}
+	}
+
 	private static long getInvalidOffsetBehavior(Properties config) {
 		final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
 		if (val.equals("none")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1836e08f/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
index f0b58cf..9520f55 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Collections;
 import java.util.Properties;
 
-import static org.junit.Assert.*;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.Test;
 
 public class KafkaConsumer08Test {
 
@@ -89,4 +89,51 @@ public class KafkaConsumer08Test {
 			assertTrue(e.getMessage().contains("Unable to retrieve any partitions"));
 		}
 	}
+
+	@Test
+	public void testAllBoostrapServerHostsAreInvalid() {
+		try {
+			String zookeeperConnect = "localhost:56794";
+			String bootstrapServers = "indexistentHost:11111";
+			String groupId = "non-existent-group";
+			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+					new SimpleStringSchema(), props);
+			consumer.open(new Configuration());
+			fail();
+		} catch (Exception e) {
+			assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!",
+					e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+							+ "' config are invalid"));
+		}
+	}
+
+	@Test
+	public void testAtLeastOneBootstrapServerHostIsValid() {
+		try {
+			String zookeeperConnect = "localhost:56794";
+			// we declare one valid boostrap server, namely the one with
+			// 'localhost'
+			String bootstrapServers = "indexistentHost:11111, localhost:22222";
+			String groupId = "non-existent-group";
+			Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId);
+			FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"),
+					new SimpleStringSchema(), props);
+			consumer.open(new Configuration());
+			fail();
+		} catch (Exception e) {
+			// test is not failing because we have one valid boostrap server
+			assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!",
+					!e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+							+ " config are invalid"));
+		}
+	}
+	
+	private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) {
+		Properties props = new Properties();
+		props.setProperty("zookeeper.connect", zookeeperConnect);
+		props.setProperty("bootstrap.servers", bootstrapServers);
+		props.setProperty("group.id", groupId);
+		return props;
+	}
 }