You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:09 UTC

[03/13] flink git commit: [FLINK-6353] Fix legacy user-state restore from 1.2

[FLINK-6353] Fix legacy user-state restore from 1.2

State that was checkpointed using Checkpointed (on a user function)
could be restored using CheckpointedRestoring when the savepoint was
done on Flink 1.2. The reason was an overzealous check in
AbstractUdfStreamOperator that only restores from "legacy" operator
state using CheckpointedRestoring when the stream is a Migration stream.

This removes that check but we still need to make sure to read away the
byte that indicates whether there is legacy state, which is written when
we're restoring from a Flink 1.1 savepoint.

After this fix, the procedure for a user to migrate a user function away
from the Checkpointed interface is this:

 - Perform savepoint with user function still implementing Checkpointed,
   shutdown job
 - Change user function to implement CheckpointedRestoring
 - Restore from previous savepoint, user function has to somehow move
   the state that is restored using CheckpointedRestoring to another
   type of state, .e.g operator state, using the OperatorStateStore.
 - Perform another savepoint, shutdown job
 - Remove CheckpointedRestoring interface from user function
 - Restore from the second savepoint
 - Done.

If the CheckpointedRestoring interface is not removed as prescribed in
the last steps then a future restore of a new savepoint will fail
because Flink will try to read legacy operator state that is not there
anymore.  The above steps also apply to Flink 1.3, when a user want's to
move away from the Checkpointed interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cced07a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cced07a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cced07a2

Branch: refs/heads/master
Commit: cced07a28622016ca1ee2d5b316423701c9a986c
Parents: 72dfce4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Apr 21 11:43:53 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/AbstractUdfStreamOperator.java   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cced07a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 2fa1e38..14857de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -154,12 +154,14 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
+		boolean haveReadUdfStateFlag = false;
 		if (userFunction instanceof Checkpointed ||
-				(userFunction instanceof CheckpointedRestoring && in instanceof Migration)) {
+				(userFunction instanceof CheckpointedRestoring)) {
 			@SuppressWarnings("unchecked")
 			CheckpointedRestoring<Serializable> chkFunction = (CheckpointedRestoring<Serializable>) userFunction;
 
 			int hasUdfState = in.read();
+			haveReadUdfStateFlag = true;
 
 			if (hasUdfState == 1) {
 				Serializable functionState = InstantiationUtil.deserializeObject(in, getUserCodeClassloader());
@@ -171,7 +173,9 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 					}
 				}
 			}
-		} else if (in instanceof Migration) {
+		}
+
+		if (in instanceof Migration && !haveReadUdfStateFlag) {
 			// absorb the introduced byte from the migration stream without too much further consequences
 			int hasUdfState = in.read();
 			if (hasUdfState == 1) {