You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/13 10:26:16 UTC

flink git commit: [FLINK-5467] Avoid legacy state for CheckpointedRestoring operators

Repository: flink
Updated Branches:
  refs/heads/master 46423b9c7 -> 51a357351


[FLINK-5467] Avoid legacy state for CheckpointedRestoring operators

This closes #3102.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a35735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a35735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a35735

Branch: refs/heads/master
Commit: 51a357351b955844941edd9a9b1406cdc787b18a
Parents: 46423b9
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Jan 12 12:24:34 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 13 11:20:53 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/operators/AbstractUdfStreamOperator.java   | 6 +++---
 .../org/apache/flink/test/checkpointing/RescalingITCase.java | 8 +++++++-
 2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 81f709b..15e26c9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -195,14 +195,13 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 			} catch (Exception e) {
 				throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
 			}
-		} else if (userFunction instanceof CheckpointedRestoring) {
-			out.write(0);
 		}
 	}
 
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
-		if (userFunction instanceof CheckpointedRestoring) {
+		if (userFunction instanceof Checkpointed ||
+				(userFunction instanceof CheckpointedRestoring && in instanceof Migration)) {
 			@SuppressWarnings("unchecked")
 			CheckpointedRestoring<Serializable> chkFunction = (CheckpointedRestoring<Serializable>) userFunction;
 
@@ -219,6 +218,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 				}
 			}
 		} else if (in instanceof Migration) {
+			// absorb the introduced byte from the migration stream without too much further consequences
 			int hasUdfState = in.read();
 			if (hasUdfState == 1) {
 				throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring");

http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index bc65abf..da4a01b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -903,7 +904,7 @@ public class RescalingITCase extends TestLogger {
 		}
 	}
 
-	private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction {
+	private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring<Integer> {
 
 		private static final long serialVersionUID = -359715965103593462L;
 		private static final int NUM_PARTITIONS = 7;
@@ -945,5 +946,10 @@ public class RescalingITCase extends TestLogger {
 				CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 			}
 		}
+
+		@Override
+		public void restoreState(Integer state) throws Exception {
+			counterPartitions.add(state);
+		}
 	}
 }