You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/29 15:57:42 UTC
flink git commit: [FLINK-3081] Properly stop periodic Kafka committer
Repository: flink
Updated Branches:
refs/heads/master e9a2bc9d0 -> a997dd615
[FLINK-3081] Properly stop periodic Kafka committer
This closes #1410
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a997dd61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a997dd61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a997dd61
Branch: refs/heads/master
Commit: a997dd615598650934f0b785cbe8a6468ea63481
Parents: e9a2bc9
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Nov 26 14:25:54 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Sun Nov 29 15:56:23 2015 +0100
----------------------------------------------------------------------
.../streaming/connectors/kafka/FlinkKafkaConsumer.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a997dd61/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index 446648f..e42faef 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -414,14 +414,18 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
// same here.
long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
+ offsetCommitter.setDaemon(true);
offsetCommitter.start();
LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
}
- fetcher.run(sourceContext, deserializer, lastOffsets);
-
- if (offsetCommitter != null) {
- offsetCommitter.close();
+ try {
+ fetcher.run(sourceContext, deserializer, lastOffsets);
+ } finally {
+ if (offsetCommitter != null) {
+ offsetCommitter.close();
+ offsetCommitter.join();
+ }
}
}
else {