You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:26 UTC
[19/50] [abbrv] flink git commit: [FLINK-5892] Add tests for topology
modifications
[FLINK-5892] Add tests for topology modifications
This closes #3770.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c68085f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c68085f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c68085f
Branch: refs/heads/table-retraction
Commit: 2c68085f658873c2d5836fbad6b82be76a79f0f9
Parents: f7980a7
Author: guowei.mgw <gu...@gmail.com>
Authored: Fri Apr 28 19:40:58 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 28 20:11:39 2017 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinatorTest.java | 292 +++++++++++++++++++
1 file changed, 292 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2c68085f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 0d2e903..41b0e35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2370,6 +2370,22 @@ public class CheckpointCoordinatorTest {
testRestoreLatestCheckpointedStateWithChangingParallelism(false);
}
+ @Test
+ public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
+ testStateRecoveryWithTopologyChange(0);
+ }
+
+ @Test
+ public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
+ testStateRecoveryWithTopologyChange(1);
+ }
+
+ @Test
+ public void testStateRecoveryWhenTopologyChange() throws Exception {
+ testStateRecoveryWithTopologyChange(2);
+ }
+
+
/**
* Tests the checkpoint restoration with changing parallelism of job vertex with partitioned
* state.
@@ -2530,6 +2546,282 @@ public class CheckpointCoordinatorTest {
comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw);
}
+ private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
+ JobVertexID jobVertexID = new JobVertexID();
+ OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
+ return new Tuple2<>(jobVertexID, operatorID);
+ }
+
+ /**
+ * old topology
+ * [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2
+ *
+ *
+ * new topology
+ *
+ * [operator5,operator1,operator3] * newParallelism1 -> [operator3, operator6] * newParallelism2
+ *
+ * scaleType:
+ * 0 increase parallelism
+ * 1 decrease parallelism
+ * 2 same parallelism
+ */
+ public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception {
+
+ /**
+ * Old topology
+ * CHAIN(op1 -> op2) * parallelism1 -> CHAIN(op3 -> op4) * parallelism2
+ */
+ Tuple2<JobVertexID, OperatorID> id1 = generateIDPair();
+ Tuple2<JobVertexID, OperatorID> id2 = generateIDPair();
+ int parallelism1 = 10;
+ int maxParallelism1 = 64;
+
+ Tuple2<JobVertexID, OperatorID> id3 = generateIDPair();
+ Tuple2<JobVertexID, OperatorID> id4 = generateIDPair();
+ int parallelism2 = 10;
+ int maxParallelism2 = 64;
+
+ List<KeyGroupRange> keyGroupPartitions2 =
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+
+ Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+
+ //prepare vertex1 state
+ for (Tuple2<JobVertexID, OperatorID> id : Lists.newArrayList(id1, id2)) {
+ OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1);
+ operatorStates.put(id.f1, taskState);
+ for (int index = 0; index < taskState.getParallelism(); index++) {
+ StreamStateHandle subNonPartitionedState =
+ generateStateForVertex(id.f0, index)
+ .get(0);
+ OperatorStateHandle subManagedOperatorState =
+ generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false)
+ .get(0);
+ OperatorStateHandle subRawOperatorState =
+ generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true)
+ .get(0);
+
+ OperatorSubtaskState subtaskState = new OperatorSubtaskState(subNonPartitionedState,
+ subManagedOperatorState,
+ subRawOperatorState,
+ null,
+ null);
+ taskState.putState(index, subtaskState);
+ }
+ }
+
+ List<List<ChainedStateHandle<OperatorStateHandle>>> expectedManagedOperatorStates = new ArrayList<>();
+ List<List<ChainedStateHandle<OperatorStateHandle>>> expectedRawOperatorStates = new ArrayList<>();
+ //prepare vertex2 state
+ for (Tuple2<JobVertexID, OperatorID> id : Lists.newArrayList(id3, id4)) {
+ OperatorState operatorState = new OperatorState(id.f1, parallelism2, maxParallelism2);
+ operatorStates.put(id.f1, operatorState);
+ List<ChainedStateHandle<OperatorStateHandle>> expectedManagedOperatorState = new ArrayList<>();
+ List<ChainedStateHandle<OperatorStateHandle>> expectedRawOperatorState = new ArrayList<>();
+ expectedManagedOperatorStates.add(expectedManagedOperatorState);
+ expectedRawOperatorStates.add(expectedRawOperatorState);
+
+ for (int index = 0; index < operatorState.getParallelism(); index++) {
+ OperatorStateHandle subManagedOperatorState =
+ generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false)
+ .get(0);
+ OperatorStateHandle subRawOperatorState =
+ generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true)
+ .get(0);
+ KeyGroupsStateHandle subManagedKeyedState = id.f0.equals(id3.f0)
+ ? generateKeyGroupState(id.f0, keyGroupPartitions2.get(index), false)
+ : null;
+ KeyGroupsStateHandle subRawKeyedState = id.f0.equals(id3.f0)
+ ? generateKeyGroupState(id.f0, keyGroupPartitions2.get(index), true)
+ : null;
+
+ expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle(subManagedOperatorState));
+ expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle(subRawOperatorState));
+
+ OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+ null,
+ subManagedOperatorState,
+ subRawOperatorState,
+ subManagedKeyedState,
+ subRawKeyedState);
+ operatorState.putState(index, subtaskState);
+ }
+ }
+
+ /**
+ * New topology
+ * CHAIN(op5 -> op1 -> op2) * newParallelism1 -> CHAIN(op3 -> op6) * newParallelism2
+ */
+ Tuple2<JobVertexID, OperatorID> id5 = generateIDPair();
+ int newParallelism1 = 10;
+
+ Tuple2<JobVertexID, OperatorID> id6 = generateIDPair();
+ int newParallelism2 = parallelism2;
+
+ if (scaleType == 0) {
+ newParallelism2 = 20;
+ } else if (scaleType == 1) {
+ newParallelism2 = 8;
+ }
+
+ List<KeyGroupRange> newKeyGroupPartitions2 =
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+
+ final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
+ id5.f0,
+ Lists.newArrayList(id2.f1, id1.f1, id5.f1),
+ newParallelism1,
+ maxParallelism1);
+
+ final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
+ id3.f0,
+ Lists.newArrayList(id6.f1, id3.f1),
+ newParallelism2,
+ maxParallelism2);
+
+ Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+
+ tasks.put(id5.f0, newJobVertex1);
+ tasks.put(id3.f0, newJobVertex2);
+
+ JobID jobID = new JobID();
+ StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore =
+ spy(new StandaloneCompletedCheckpointStore(1));
+
+ CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(
+ jobID,
+ 2,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 3000,
+ operatorStates,
+ Collections.<MasterState>emptyList(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ null);
+
+ when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ newJobVertex1.getTaskVertices(),
+ newJobVertex1.getTaskVertices(),
+ newJobVertex1.getTaskVertices(),
+ new StandaloneCheckpointIDCounter(),
+ standaloneCompletedCheckpointStore,
+ null,
+ Executors.directExecutor());
+
+ coord.restoreLatestCheckpointedState(tasks, false, true);
+
+ for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
+
+ TaskStateHandles taskStateHandles = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles();
+ ChainedStateHandle<StreamStateHandle> actualSubNonPartitionedState = taskStateHandles.getLegacyOperatorState();
+ List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = taskStateHandles.getManagedOperatorState();
+ List<Collection<OperatorStateHandle>> actualSubRawOperatorState = taskStateHandles.getRawOperatorState();
+
+ assertNull(taskStateHandles.getManagedKeyedState());
+ assertNull(taskStateHandles.getRawKeyedState());
+
+ // operator5
+ {
+ int operatorIndexInChain = 2;
+ assertNull(actualSubNonPartitionedState.get(operatorIndexInChain));
+ assertNull(actualSubManagedOperatorState.get(operatorIndexInChain));
+ assertNull(actualSubRawOperatorState.get(operatorIndexInChain));
+ }
+ // operator1
+ {
+ int operatorIndexInChain = 1;
+ ChainedStateHandle<StreamStateHandle> expectSubNonPartitionedState = generateStateForVertex(id1.f0, i);
+ ChainedStateHandle<OperatorStateHandle> expectedManagedOpState = generateChainedPartitionableStateHandle(
+ id1.f0, i, 2, 8, false);
+ ChainedStateHandle<OperatorStateHandle> expectedRawOpState = generateChainedPartitionableStateHandle(
+ id1.f0, i, 2, 8, true);
+
+ assertTrue(CommonTestUtils.isSteamContentEqual(
+ expectSubNonPartitionedState.get(0).openInputStream(),
+ actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream()));
+
+ assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(),
+ actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+
+ assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(),
+ actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+ }
+ // operator2
+ {
+ int operatorIndexInChain = 0;
+ ChainedStateHandle<StreamStateHandle> expectSubNonPartitionedState = generateStateForVertex(id2.f0, i);
+ ChainedStateHandle<OperatorStateHandle> expectedManagedOpState = generateChainedPartitionableStateHandle(
+ id2.f0, i, 2, 8, false);
+ ChainedStateHandle<OperatorStateHandle> expectedRawOpState = generateChainedPartitionableStateHandle(
+ id2.f0, i, 2, 8, true);
+
+ assertTrue(CommonTestUtils.isSteamContentEqual(expectSubNonPartitionedState.get(0).openInputStream(),
+ actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream()));
+
+ assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(),
+ actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+
+ assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(),
+ actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+ }
+ }
+
+ List<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<>(newJobVertex2.getParallelism());
+ List<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<>(newJobVertex2.getParallelism());
+
+ for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
+ TaskStateHandles taskStateHandles = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles();
+
+ // operator 3
+ {
+ int operatorIndexInChain = 1;
+ List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = new ArrayList<>(1);
+ actualSubManagedOperatorState.add(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain));
+
+ List<Collection<OperatorStateHandle>> actualSubRawOperatorState = new ArrayList<>(1);
+ actualSubRawOperatorState.add(taskStateHandles.getRawOperatorState().get(operatorIndexInChain));
+
+ actualManagedOperatorStates.add(actualSubManagedOperatorState);
+ actualRawOperatorStates.add(actualSubRawOperatorState);
+
+ assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain));
+ }
+
+ // operator 6
+ {
+ int operatorIndexInChain = 0;
+ assertNull(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain));
+ assertNull(taskStateHandles.getRawOperatorState().get(operatorIndexInChain));
+ assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain));
+
+ }
+
+ KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), false);
+ KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true);
+
+
+ Collection<KeyedStateHandle> keyedStateBackend = taskStateHandles.getManagedKeyedState();
+ Collection<KeyedStateHandle> keyGroupStateRaw = taskStateHandles.getRawKeyedState();
+
+
+ compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
+ compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
+ }
+
+ comparePartitionableState(expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
+ comparePartitionableState(expectedRawOperatorStates.get(0), actualRawOperatorStates);
+ }
+
/**
* Tests that the externalized checkpoint configuration is respected.
*/