You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/30 16:14:47 UTC
flink git commit: [FLINK-3061] Properly fail Kafka Consumer if broker
is not available
Repository: flink
Updated Branches:
refs/heads/master 3e9d33ee5 -> 209ae6c91
[FLINK-3061] Properly fail Kafka Consumer if broker is not available
This closes #1395
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/209ae6c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/209ae6c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/209ae6c9
Branch: refs/heads/master
Commit: 209ae6c916e1bff7126b074dfe831bbc7b113e4a
Parents: 3e9d33e
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Nov 23 17:57:26 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Nov 30 16:04:36 2015 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer.java | 11 +++++++-
.../connectors/kafka/KafkaConsumerTestBase.java | 29 ++++++++++++++++++++
.../streaming/connectors/kafka/KafkaITCase.java | 5 ++++
3 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e42faef..2d1d91a 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -306,6 +306,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
// Connect to a broker to get the partitions
List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);
+ if (partitionInfos.size() == 0) {
+ throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." +
+ "Please check previous log entries");
+ }
+
// get initial partitions list. The order of the partitions is important for consistent
// partition id assignment in restart cases.
this.partitions = new int[partitionInfos.size()];
@@ -424,7 +429,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
} finally {
if (offsetCommitter != null) {
offsetCommitter.close();
- offsetCommitter.join();
+ try {
+ offsetCommitter.join();
+ } catch(InterruptedException ie) {
+ // ignore interrupt
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 48f4c50..2116c01 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -123,6 +123,35 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// select which tests to run.
// ------------------------------------------------------------------------
+
+ /**
+ * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
+ * and a wrong broker was specified
+ *
+ * @throws Exception
+ */
+ public void runFailOnNoBrokerTest() throws Exception {
+ try {
+ Properties properties = new Properties();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ see.getConfig().disableSysoutLogging();
+ see.setNumberOfExecutionRetries(0);
+ see.setParallelism(1);
+
+ // use wrong ports for the consumers
+ properties.setProperty("bootstrap.servers", "localhost:80");
+ properties.setProperty("zookeeper.connect", "localhost:80");
+ properties.setProperty("group.id", "test");
+ FlinkKafkaConsumer<String> source = getConsumer("doesntexist", new SimpleStringSchema(), properties);
+ DataStream<String> stream = see.addSource(source);
+ stream.print();
+ see.execute("No broker test");
+ } catch(RuntimeException re){
+ Assert.assertTrue("Wrong RuntimeException thrown",
+ re.getMessage().contains("Unable to retrieve any partitions for topic"));
+ }
+ }
/**
* Test that validates that checkpointing and checkpoint notification works properly
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 3ca7c5c..5f2cdbc 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -40,6 +40,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
runCheckpointingTest();
}
+ @Test()
+ public void testFailOnNoBroker() throws Exception {
+ runFailOnNoBrokerTest();
+ }
+
@Test
public void testOffsetInZookeeper() throws Exception {
runOffsetInZookeeperValidationTest();