You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/24 18:19:56 UTC

[3/6] flink git commit: [FLINK-4905] [kafka 08 consumer] Suppress offset committing failures when fetcher is shutting down

[FLINK-4905] [kafka 08 consumer] Suppress offset committing failures when fetcher is shutting down

This closes #3035


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7cda75b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7cda75b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7cda75b

Branch: refs/heads/master
Commit: e7cda75b8594417559d6aac6229b5893f5459f0f
Parents: 6342d6d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 16:28:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer08.java         |  7 ++++---
 .../connectors/kafka/internals/Kafka08Fetcher.java     | 13 +++++++++++--
 .../streaming/connectors/kafka/Kafka08ITCase.java      |  7 -------
 3 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0aacccd..0f11c72 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -292,11 +292,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
 						}
 					}
 					break retryLoop; // leave the loop through the brokers
-				} catch (Exception e) {
+				}
+				catch (Exception e) {
 					//validates seed brokers in case of a ClosedChannelException
 					validateSeedBrokers(seedBrokers, e);
-					LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
-							"" + e.getClass() + ". Message: " + e.getMessage());
+					LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+							seedBroker, topics, e.getClass().getName(), e.getMessage());
 					LOG.debug("Detailed trace", e);
 					// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
 					try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index d015157..5a0aed3 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -347,8 +347,17 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 	public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
 		ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
 		if (zkHandler != null) {
-			// the ZK handler takes care of incrementing the offsets by 1 before committing
-			zkHandler.prepareAndCommitOffsets(offsets);
+			try {
+				// the ZK handler takes care of incrementing the offsets by 1 before committing
+				zkHandler.prepareAndCommitOffsets(offsets);
+			}
+			catch (Exception e) {
+				if (running) {
+					throw e;
+				} else {
+					return;
+				}
+			}
 		}
 
 		// Set committed offsets in topic partition state

http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index fabb0fe..0cdf465 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,19 +19,12 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;