You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/30 16:14:47 UTC

flink git commit: [FLINK-3061] Properly fail Kafka Consumer if broker is not available

Repository: flink
Updated Branches:
  refs/heads/master 3e9d33ee5 -> 209ae6c91


[FLINK-3061] Properly fail Kafka Consumer if broker is not available

This closes #1395


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/209ae6c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/209ae6c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/209ae6c9

Branch: refs/heads/master
Commit: 209ae6c916e1bff7126b074dfe831bbc7b113e4a
Parents: 3e9d33e
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Nov 23 17:57:26 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Nov 30 16:04:36 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer.java    | 11 +++++++-
 .../connectors/kafka/KafkaConsumerTestBase.java | 29 ++++++++++++++++++++
 .../streaming/connectors/kafka/KafkaITCase.java |  5 ++++
 3 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e42faef..2d1d91a 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -306,6 +306,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		// Connect to a broker to get the partitions
 		List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
 
+		if (partitionInfos.size() == 0) {
+			throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." +
+					"Please check previous log entries");
+		}
+
 		// get initial partitions list. The order of the partitions is important for consistent 
 		// partition id assignment in restart cases.
 		this.partitions = new int[partitionInfos.size()];
@@ -424,7 +429,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			} finally {
 				if (offsetCommitter != null) {
 					offsetCommitter.close();
-					offsetCommitter.join();
+					try {
+						offsetCommitter.join();
+					} catch(InterruptedException ie) {
+						// ignore interrupt
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 48f4c50..2116c01 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -123,6 +123,35 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	//  select which tests to run.
 	// ------------------------------------------------------------------------
 
+
+	/**
+	 * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
+	 * and a wrong broker was specified
+	 *
+	 * @throws Exception
+	 */
+	public void runFailOnNoBrokerTest() throws Exception {
+		try {
+			Properties properties = new Properties();
+
+			StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+			see.getConfig().disableSysoutLogging();
+			see.setNumberOfExecutionRetries(0);
+			see.setParallelism(1);
+
+			// use wrong ports for the consumers
+			properties.setProperty("bootstrap.servers", "localhost:80");
+			properties.setProperty("zookeeper.connect", "localhost:80");
+			properties.setProperty("group.id", "test");
+			FlinkKafkaConsumer<String> source = getConsumer("doesntexist", new SimpleStringSchema(), properties);
+			DataStream<String> stream = see.addSource(source);
+			stream.print();
+			see.execute("No broker test");
+		} catch(RuntimeException re){
+			Assert.assertTrue("Wrong RuntimeException thrown",
+					re.getMessage().contains("Unable to retrieve any partitions for topic"));
+		}
+	}
 	/**
 	 * Test that validates that checkpointing and checkpoint notification works properly
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 3ca7c5c..5f2cdbc 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -40,6 +40,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 		runCheckpointingTest();
 	}
 
+	@Test()
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
 	@Test
 	public void testOffsetInZookeeper() throws Exception {
 		runOffsetInZookeeperValidationTest();