You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 11:42:51 UTC
flink git commit: [FLINK-7595] [Savepoints] Allow removing stateless
operators
Repository: flink
Updated Branches:
refs/heads/master f4e4cd6cb -> 80348d653
[FLINK-7595] [Savepoints] Allow removing stateless operators
This closes #4651.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80348d65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80348d65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80348d65
Branch: refs/heads/master
Commit: 80348d653b48e1b7d6a0b9275dbfa510eaea151f
Parents: f4e4cd6
Author: zentol <ch...@apache.org>
Authored: Wed Sep 6 15:38:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 12:42:24 2017 +0100
----------------------------------------------------------------------
.../checkpoint/savepoint/SavepointLoader.java | 12 +++-
.../savepoint/SavepointLoaderTest.java | 16 ++++--
.../AbstractOperatorRestoreTestBase.java | 11 +++-
...AbstractNonKeyedOperatorRestoreTestBase.java | 7 ++-
.../ChainLengthStatelessDecreaseTest.java | 59 ++++++++++++++++++++
5 files changed, 96 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 38db7c2..31d9124 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -120,14 +121,19 @@ public class SavepointLoader {
} else if (allowNonRestoredState) {
LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
} else {
- String msg = String.format("Failed to rollback to savepoint %s. " +
+ for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
+ if (operatorSubtaskState.hasState()) {
+ String msg = String.format("Failed to rollback to savepoint %s. " +
"Cannot map savepoint state for operator %s to the new program, " +
"because the operator is not available in the new program. If " +
"you want to allow to skip this, you can set the --allowNonRestoredState " +
"option on the CLI.",
- savepointPath, operatorState.getOperatorID());
+ savepointPath, operatorState.getOperatorID());
- throw new IllegalStateException(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
+ LOG.info("Skipping empty savepoint state for operator {}.", operatorState.getOperatorID());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 331621d..a461569 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -22,9 +22,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -59,10 +63,14 @@ public class SavepointLoaderTest {
JobVertexID jobVertexID = new JobVertexID();
OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
- OperatorState state = mock(OperatorState.class);
- when(state.getParallelism()).thenReturn(parallelism);
- when(state.getOperatorID()).thenReturn(operatorID);
- when(state.getMaxParallelism()).thenReturn(parallelism);
+ OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+ new OperatorStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
+ null,
+ null,
+ null);
+
+ OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
+ state.putState(0, subtaskState);
Map<OperatorID, OperatorState> taskStates = new HashMap<>();
taskStates.put(operatorID, state);
http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 7488b62..c86f21f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -89,6 +89,15 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
private static ActorGateway taskManager = null;
private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
+ private final boolean allowNonRestoredState;
+
+ protected AbstractOperatorRestoreTestBase() {
+ this(true);
+ }
+
+ protected AbstractOperatorRestoreTestBase(boolean allowNonRestoredState) {
+ this.allowNonRestoredState = allowNonRestoredState;
+ }
@BeforeClass
public static void beforeClass() {
@@ -238,7 +247,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
private void restoreJob(String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
- jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
+ jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));
Object msg;
Object result;
http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index f07bd4d..c100dc9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -51,7 +51,12 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
"nonKeyed-flink1.3");
}
- public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+ protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+ this.savepointPath = savepointPath;
+ }
+
+ protected AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, boolean allowNonRestoredState) {
+ super(allowNonRestoredState);
this.savepointPath = savepointPath;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/80348d65/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
new file mode 100644
index 0000000..fb4dcf5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change removes an operator from a chain.
+ *
+ * <p>This test specifically checks that stateless operators can be removed even if all states from the previous job
+ * must be restored.
+ */
+public class ChainLengthStatelessDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+ public ChainLengthStatelessDecreaseTest(String savepointPath) {
+ super(savepointPath, false);
+ }
+
+ @Override
+ public void createRestoredJob(StreamExecutionEnvironment env) {
+ /*
+ * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+ * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> StatefulMap3)
+ */
+ DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+ SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+ first.startNewChain();
+
+ SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+ second.startNewChain();
+
+ SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, second);
+ }
+}