You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/24 14:14:15 UTC
[4/5] flink git commit: [FLINK-7623][tests] Add tests verifying
isRestored flag
[FLINK-7623][tests] Add tests verifying isRestored flag
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc32991a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc32991a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc32991a
Branch: refs/heads/master
Commit: bc32991a358889464a5b596684aa40ff3e31acbe
Parents: 4379d0d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Oct 18 15:01:37 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 24 16:13:25 2017 +0200
----------------------------------------------------------------------
.../checkpoint/StateAssignmentOperation.java | 2 +-
.../runtime/tasks/RestoreStreamTaskTest.java | 359 +++++++++++++++++++
2 files changed, 360 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bc32991a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 6a58acb..d80311c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -219,7 +219,7 @@ public class StateAssignmentOperation {
}
}
- private static OperatorSubtaskState operatorSubtaskStateFrom(
+ public static OperatorSubtaskState operatorSubtaskStateFrom(
OperatorInstanceID instanceID,
Map<OperatorInstanceID, List<OperatorStateHandle>> subManagedOperatorState,
Map<OperatorInstanceID, List<OperatorStateHandle>> subRawOperatorState,
http://git-wip-us.apache.org/repos/asf/flink/blob/bc32991a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
new file mode 100644
index 0000000..4824097
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+
+ private static final Set<OperatorID> RESTORED_OPERATORS = ConcurrentHashMap.newKeySet();
+
+ @Before
+ public void setup() {
+ RESTORED_OPERATORS.clear();
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ OperatorID headOperatorID = new OperatorID(42L, 42L);
+ OperatorID tailOperatorID = new OperatorID(44L, 44L);
+ AcknowledgeStreamMockEnvironment environment1 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.empty());
+
+ assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+ TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
+
+ AcknowledgeStreamMockEnvironment environment2 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.of(stateHandles));
+
+ assertEquals(
+ new HashSet<OperatorID>() {{
+ add(headOperatorID);
+ add(tailOperatorID);
+ }},
+ RESTORED_OPERATORS);
+ }
+
+ @Test
+ public void testRestoreHeadWithNewId() throws Exception {
+ OperatorID tailOperatorID = new OperatorID(44L, 44L);
+ AcknowledgeStreamMockEnvironment environment1 = createRunAndCheckpointOperatorChain(
+ new OperatorID(42L, 42L),
+ new CounterOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.empty());
+
+ assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+ TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
+
+ AcknowledgeStreamMockEnvironment environment2 = createRunAndCheckpointOperatorChain(
+ new OperatorID(4242L, 4242L),
+ new CounterOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.of(stateHandles));
+
+ assertEquals(
+ new HashSet<OperatorID>() {{
+ add(tailOperatorID);
+ }},
+ RESTORED_OPERATORS);
+ }
+
+ @Test
+ public void testRestoreTailWithNewId() throws Exception {
+ OperatorID headOperatorID = new OperatorID(42L, 42L);
+
+ AcknowledgeStreamMockEnvironment environment1 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator(),
+ new OperatorID(44L, 44L),
+ new CounterOperator(),
+ Optional.empty());
+
+ assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+ TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
+
+ AcknowledgeStreamMockEnvironment environment2 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator(),
+ new OperatorID(4444L, 4444L),
+ new CounterOperator(),
+ Optional.of(stateHandles));
+
+ assertEquals(
+ new HashSet<OperatorID>() {{
+ add(headOperatorID);
+ }},
+ RESTORED_OPERATORS);
+ }
+
+ @Test
+ public void testRestoreAfterScaleUp() throws Exception {
+ OperatorID headOperatorID = new OperatorID(42L, 42L);
+ OperatorID tailOperatorID = new OperatorID(44L, 44L);
+
+ AcknowledgeStreamMockEnvironment environment1 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.empty());
+
+ assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+ // test empty state in case of scale up
+ OperatorSubtaskState emptyHeadOperatorState = StateAssignmentOperation.operatorSubtaskStateFrom(
+ new OperatorInstanceID(0, headOperatorID),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
+ stateHandles.putSubtaskStateByOperatorID(headOperatorID, emptyHeadOperatorState);
+
+ AcknowledgeStreamMockEnvironment environment2 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new CounterOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.of(stateHandles));
+
+ assertEquals(
+ new HashSet<OperatorID>() {{
+ add(headOperatorID);
+ add(tailOperatorID);
+ }},
+ RESTORED_OPERATORS);
+ }
+
+ @Test
+ public void testRestoreWithoutState() throws Exception {
+ OperatorID headOperatorID = new OperatorID(42L, 42L);
+ OperatorID tailOperatorID = new OperatorID(44L, 44L);
+
+ AcknowledgeStreamMockEnvironment environment1 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new StatelessOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.empty());
+
+ assertEquals(2, environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+ TaskStateSnapshot stateHandles = environment1.getCheckpointStateHandles();
+
+ AcknowledgeStreamMockEnvironment environment2 = createRunAndCheckpointOperatorChain(
+ headOperatorID,
+ new StatelessOperator(),
+ tailOperatorID,
+ new CounterOperator(),
+ Optional.of(stateHandles));
+
+ assertEquals(
+ new HashSet<OperatorID>() {{
+ add(headOperatorID);
+ add(tailOperatorID);
+ }},
+ RESTORED_OPERATORS);
+ }
+
+ private AcknowledgeStreamMockEnvironment createRunAndCheckpointOperatorChain(
+ OperatorID headId,
+ OneInputStreamOperator<String, String> headOperator,
+ OperatorID tailId,
+ OneInputStreamOperator<String, String> tailOperator,
+ Optional<TaskStateSnapshot> stateHandles) throws Exception {
+
+ final OneInputStreamTask<String, String> streamTask = new OneInputStreamTask<>();
+ final OneInputStreamTaskTestHarness<String, String> testHarness =
+ new OneInputStreamTaskTestHarness<String, String>(
+ streamTask, 1, 1,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setupOperatorChain(headId, headOperator)
+ .chain(tailId, tailOperator, StringSerializer.INSTANCE)
+ .finish();
+
+ AcknowledgeStreamMockEnvironment environment = new AcknowledgeStreamMockEnvironment(
+ testHarness.jobConfig,
+ testHarness.taskConfig,
+ testHarness.executionConfig,
+ testHarness.memorySize,
+ new MockInputSplitProvider(),
+ testHarness.bufferSize);
+
+ if (stateHandles.isPresent()) {
+ streamTask.setInitialState(stateHandles.get());
+ }
+ testHarness.invoke(environment);
+ testHarness.waitForTaskRunning();
+
+ processRecords(testHarness);
+ triggerCheckpoint(testHarness, environment, streamTask);
+
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+
+ return environment;
+ }
+
+ private void triggerCheckpoint(
+ OneInputStreamTaskTestHarness<String, String> testHarness,
+ AcknowledgeStreamMockEnvironment environment,
+ OneInputStreamTask<String, String> streamTask) throws Exception {
+ long checkpointId = 1L;
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 1L);
+
+ while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint())) {}
+
+ environment.getCheckpointLatch().await();
+ assertEquals(checkpointId, environment.getCheckpointId());
+ }
+
+ private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception {
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.processElement(new StreamRecord<>("10"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("20"), 0, 0);
+ testHarness.processElement(new StreamRecord<>("30"), 0, 0);
+
+ testHarness.waitForInputProcessing();
+
+ expectedOutput.add(new StreamRecord<>("10"));
+ expectedOutput.add(new StreamRecord<>("20"));
+ expectedOutput.add(new StreamRecord<>("30"));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+ }
+
+ private abstract static class RestoreWatchOperator<IN, OUT>
+ extends AbstractStreamOperator<OUT>
+ implements OneInputStreamOperator<IN, OUT> {
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ if (context.isRestored()) {
+ RESTORED_OPERATORS.add(getOperatorID());
+ }
+ }
+ }
+
+ /**
+ * Operator that counts processed messages and keeps result on state.
+ */
+ private static class CounterOperator extends RestoreWatchOperator<String, String> {
+ private static final long serialVersionUID = 2048954179291813243L;
+
+ private ListState<Long> counterState;
+ private long counter = 0;
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws Exception {
+ counter++;
+ output.collect(element);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ super.initializeState(context);
+
+ counterState = context
+ .getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("counter-state", LongSerializer.INSTANCE));
+
+ if (context.isRestored()) {
+ for (Long value : counterState.get()) {
+ counter += value;
+ }
+ counterState.clear();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ counterState.add(counter);
+ }
+ }
+
+ /**
+ * Operator that does nothing except counting state restorations.
+ */
+ private static class StatelessOperator extends RestoreWatchOperator<String, String> {
+
+ private static final long serialVersionUID = 2048954179291813244L;
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws Exception {
+ output.collect(element);
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ }
+ }
+}