You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/24 18:19:56 UTC
[3/6] flink git commit: [FLINK-4905] [kafka 08 consumer] Suppress
offset committing failures when fetcher is shutting down
[FLINK-4905] [kafka 08 consumer] Suppress offset committing failures when fetcher is shutting down
This closes #3035
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7cda75b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7cda75b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7cda75b
Branch: refs/heads/master
Commit: e7cda75b8594417559d6aac6229b5893f5459f0f
Parents: 6342d6d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 16:28:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer08.java | 7 ++++---
.../connectors/kafka/internals/Kafka08Fetcher.java | 13 +++++++++++--
.../streaming/connectors/kafka/Kafka08ITCase.java | 7 -------
3 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 0aacccd..0f11c72 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -292,11 +292,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
}
}
break retryLoop; // leave the loop through the brokers
- } catch (Exception e) {
+ }
+ catch (Exception e) {
//validates seed brokers in case of a ClosedChannelException
validateSeedBrokers(seedBrokers, e);
- LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." +
- "" + e.getClass() + ". Message: " + e.getMessage());
+ LOG.warn("Error communicating with broker {} to find partitions for {}. {} Message: {}",
+ seedBroker, topics, e.getClass().getName(), e.getMessage());
LOG.debug("Detailed trace", e);
// we sleep a bit. Retrying immediately doesn't make sense in cases where Kafka is reorganizing the leader metadata
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index d015157..5a0aed3 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -347,8 +347,17 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
- // the ZK handler takes care of incrementing the offsets by 1 before committing
- zkHandler.prepareAndCommitOffsets(offsets);
+ try {
+ // the ZK handler takes care of incrementing the offsets by 1 before committing
+ zkHandler.prepareAndCommitOffsets(offsets);
+ }
+ catch (Exception e) {
+ if (running) {
+ throw e;
+ } else {
+ return;
+ }
+ }
}
// Set committed offsets in topic partition state
http://git-wip-us.apache.org/repos/asf/flink/blob/e7cda75b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index fabb0fe..0cdf465 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,19 +19,12 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
-import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;