You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/06/08 13:17:48 UTC

[3/6] flink git commit: [FLINK-4000] [RabbitMQ] Fix for checkpoint state restore at MessageAcknowledgingSourceBase

[FLINK-4000] [RabbitMQ] Fix for checkpoint state restore at MessageAcknowledgingSourceBase

As says documentation for MessageAcknowledgingSourceBase.restoreState()

This method is invoked when a function is executed as part of a recovery run. Note that restoreState() is called before open().

So current implementation

1. Fails on restoreState with NullPointerException, jobs fail to restart.
2. Does not restore anything because following open erases all checkpoint data immediately.
3. As consequence, violates exactly once rule because processed but not acknowledged list erased.

Proposed change fixes that.

This closes #2062


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae679bb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae679bb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae679bb2

Branch: refs/heads/master
Commit: ae679bb2aa1e0e239770605e049709fbc6b9962c
Parents: 65ee28c
Author: Alexey Savartsov <as...@gmail.com>
Authored: Thu Jun 2 02:23:53 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../api/functions/source/MessageAcknowledgingSourceBase.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae679bb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index d3cbfb6..9b2c4ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -127,8 +127,10 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		idsForCurrentCheckpoint = new ArrayList<>(64);
-		pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep);
-		idsProcessedButNotAcknowledged = new HashSet<>();
+		if (pendingCheckpoints == null)
+			pendingCheckpoints = new ArrayDeque<>(numCheckpointsToKeep);
+		if (idsProcessedButNotAcknowledged == null)
+			idsProcessedButNotAcknowledged = new HashSet<>();
 	}
 
 	@Override
@@ -177,6 +179,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 
 	@Override
 	public void restoreState(SerializedCheckpointData[] state) throws Exception {
+		idsProcessedButNotAcknowledged = new HashSet<>();
 		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
 		// build a set which contains all processed ids. It may be used to check if we have
 		// already processed an incoming message.