You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/13 07:53:30 UTC

[6/6] flink git commit: [hotfix] [kafka] Remove stale comment on publishing procedures of AbstractFetcher

[hotfix] [kafka] Remove stale comment on publishing procedures of AbstractFetcher

The previous comment mentioned "only now will the fetcher return at
least the restored offsets when calling snapshotCurrentState()". This is
a remnant of the previous fetcher initialization behaviour, where in the
past the fetcher wasn't directly seeded with restored offsets on
instantiation.

Since this is no longer true, this commit fixes the stale comment to
avoid confusion.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c91544d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c91544d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c91544d1

Branch: refs/heads/release-1.4
Commit: c91544d140668a1b6c4c59909cc9374cc890ccf3
Parents: c4bfc7d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed Dec 20 11:54:40 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jan 13 11:34:28 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c91544d1/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 44e0d54..d71827f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -545,8 +545,12 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			sourceContext.markAsTemporarilyIdle();
 		}
 
-		// create the fetcher that will communicate with the Kafka brokers
-		final AbstractFetcher<T, ?> fetcher = createFetcher(
+		// from this point forward:
+		//   - 'snapshotState' will draw offsets from the fetcher,
+		//     instead of being built from `subscribedPartitionsToStartOffsets`
+		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
+		//     Kafka through the fetcher, if configured to do so)
+		this.kafkaFetcher = createFetcher(
 				sourceContext,
 				subscribedPartitionsToStartOffsets,
 				periodicWatermarkAssigner,
@@ -554,12 +558,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				(StreamingRuntimeContext) getRuntimeContext(),
 				offsetCommitMode);
 
-		// publish the reference, for snapshot-, commit-, and cancel calls
-		// IMPORTANT: We can only do that now, because only now will calls to
-		//            the fetchers 'snapshotCurrentState()' method return at least
-		//            the restored offsets
-		this.kafkaFetcher = fetcher;
-
 		if (!running) {
 			return;
 		}
@@ -598,7 +596,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 							// no need to add the discovered partitions if we were closed during the meantime
 							if (running && !discoveredPartitions.isEmpty()) {
-								fetcher.addDiscoveredPartitions(discoveredPartitions);
+								kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
 							}
 
 							// do not waste any time sleeping if we're not running anymore
@@ -621,7 +619,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			});
 
 			discoveryLoopThread.start();
-			fetcher.runFetchLoop();
+			kafkaFetcher.runFetchLoop();
 
 			// --------------------------------------------------------------------
 
@@ -638,7 +636,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			// won't be using the discoverer
 			partitionDiscoverer.close();
 
-			fetcher.runFetchLoop();
+			kafkaFetcher.runFetchLoop();
 		}
 	}