You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 11:42:51 UTC

flink git commit: [FLINK-7595] [Savepoints] Allow removing stateless operators

Repository: flink
Updated Branches:
  refs/heads/master f4e4cd6cb -> 80348d653


[FLINK-7595] [Savepoints] Allow removing stateless operators

This closes #4651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80348d65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80348d65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80348d65

Branch: refs/heads/master
Commit: 80348d653b48e1b7d6a0b9275dbfa510eaea151f
Parents: f4e4cd6
Author: zentol <ch...@apache.org>
Authored: Wed Sep 6 15:38:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 12:42:24 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/savepoint/SavepointLoader.java   | 12 +++-
 .../savepoint/SavepointLoaderTest.java          | 16 ++++--
 .../AbstractOperatorRestoreTestBase.java        | 11 +++-
 ...AbstractNonKeyedOperatorRestoreTestBase.java |  7 ++-
 .../ChainLengthStatelessDecreaseTest.java       | 59 ++++++++++++++++++++
 5 files changed, 96 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 38db7c2..31d9124 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -120,14 +121,19 @@ public class SavepointLoader {
 			} else if (allowNonRestoredState) {
 				LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
 			} else {
-				String msg = String.format("Failed to rollback to savepoint %s. " +
+				for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
+					if (operatorSubtaskState.hasState()) {
+						String msg = String.format("Failed to rollback to savepoint %s. " +
 								"Cannot map savepoint state for operator %s to the new program, " +
 								"because the operator is not available in the new program. If " +
 								"you want to allow to skip this, you can set the --allowNonRestoredState " +
 								"option on the CLI.",
-						savepointPath, operatorState.getOperatorID());
+							savepointPath, operatorState.getOperatorID());
 
-				throw new IllegalStateException(msg);
+						throw new IllegalStateException(msg);
+					}
+				}
+				LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 331621d..a461569 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -22,9 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -59,10 +63,14 @@ public class SavepointLoaderTest {
 		JobVertexID jobVertexID = new JobVertexID();
 		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 
-		OperatorState state = mock(OperatorState.class);
-		when(state.getParallelism()).thenReturn(parallelism);
-		when(state.getOperatorID()).thenReturn(operatorID);
-		when(state.getMaxParallelism()).thenReturn(parallelism);
+		OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+			new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
+			null,
+			null,
+			null);
+
+		OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
+		state.putState(0, subtaskState);
 
 		Map<OperatorID, OperatorState> taskStates = new HashMap<>();
 		taskStates.put(operatorID, state);

http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 7488b62..c86f21f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -89,6 +89,15 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 	private static ActorGateway taskManager = null;
 
 	private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
+	private final boolean allowNonRestoredState;
+
+	protected AbstractOperatorRestoreTestBase() {
+		this(true);
+	}
+
+	protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) {
+		this.allowNonRestoredState = allowNonRestoredState;
+	}
 
 	@BeforeClass
 	public static void beforeClass() {
@@ -238,7 +247,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 	private void restoreJob(String savepointPath) throws Exception {
 		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
-		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
+		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
 
 		Object msg;
 		Object result;

http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index f07bd4d..c100dc9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -51,7 +51,12 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
 			"nonKeyed-flink1.3");
 	}
 
-	public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+	protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
+	protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, boolean allowNonRestoredState) {
+		super(allowNonRestoredState);
 		this.savepointPath = savepointPath;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
new file mode 100644
index 0000000..fb4dcf5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change removes an operator from a chain.
+ *
+ * <p>This test specifically checks that stateless operators can be removed even if all states from the previous job
+ * must be restored.
+ */
+public class ChainLengthStatelessDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+	public ChainLengthStatelessDecreaseTest(String savepointPath) {
+		super(savepointPath, false);
+	}
+
+	@Override
+	public void createRestoredJob(StreamExecutionEnvironment env) {
+		/*
+		 * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> StatefulMap3)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+		second.startNewChain();
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, second);
+	}
+}