You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/29 15:57:42 UTC

flink git commit: [FLINK-3081] Properly stop periodic Kafka committer

Repository: flink
Updated Branches:
  refs/heads/master e9a2bc9d0 -> a997dd615


[FLINK-3081] Properly stop periodic Kafka committer

This closes #1410


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a997dd61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a997dd61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a997dd61

Branch: refs/heads/master
Commit: a997dd615598650934f0b785cbe8a6468ea63481
Parents: e9a2bc9
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Nov 26 14:25:54 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Sun Nov 29 15:56:23 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumer.java  | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a997dd61/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index 446648f..e42faef 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -414,14 +414,18 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				// same here.
 				long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
 				offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
+				offsetCommitter.setDaemon(true);
 				offsetCommitter.start();
 				LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
 			}
 
-			fetcher.run(sourceContext, deserializer, lastOffsets);
-
-			if (offsetCommitter != null) {
-				offsetCommitter.close();
+			try {
+				fetcher.run(sourceContext, deserializer, lastOffsets);
+			} finally {
+				if (offsetCommitter != null) {
+					offsetCommitter.close();
+					offsetCommitter.join();
+				}
 			}
 		}
 		else {