You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/07/22 12:01:56 UTC

[3/6] flink git commit: [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase

[hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase

This prevents concrete Kafka Source implementations from accidentally
overriding the checkpointing methods. This would be problematic when not
providing tests. We test the checkpoint methods of the ConsumerBase but
derived methods would not be tested.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ead25aa7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ead25aa7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ead25aa7

Branch: refs/heads/release-1.3
Commit: ead25aa7a591f28f675af509a5874a77f01713ac
Parents: b056432
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Jul 18 10:35:54 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Jul 22 19:37:09 2017 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ead25aa7/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d9b75bb..199755c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -504,7 +504,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
+	public final void initializeState(FunctionInitializationContext context) throws Exception {
 
 		// we might have been restored via restoreState() which restores from legacy operator state
 		if (!restored) {
@@ -532,7 +532,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
 		if (!running) {
 			LOG.debug("snapshotState() called on closed source");
 		} else {
@@ -577,7 +577,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 	}
 
 	@Override
-	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
+	public final void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
 		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
 			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());