You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:26 UTC

[19/50] [abbrv] flink git commit: [FLINK-5892] Add tests for topology modifications

[FLINK-5892] Add tests for topology modifications

This closes #3770.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c68085f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c68085f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c68085f

Branch: refs/heads/table-retraction
Commit: 2c68085f658873c2d5836fbad6b82be76a79f0f9
Parents: f7980a7
Author: guowei.mgw <gu...@gmail.com>
Authored: Fri Apr 28 19:40:58 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Apr 28 20:11:39 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinatorTest.java   | 292 +++++++++++++++++++
 1 file changed, 292 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c68085f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 0d2e903..41b0e35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2370,6 +2370,22 @@ public class CheckpointCoordinatorTest {
 		testRestoreLatestCheckpointedStateWithChangingParallelism(false);
 	}
 
+	@Test
+	public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
+		testStateRecoveryWithTopologyChange(0);
+	}
+
+	@Test
+	public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
+		testStateRecoveryWithTopologyChange(1);
+	}
+
+	@Test
+	public void testStateRecoveryWhenTopologyChange() throws Exception {
+		testStateRecoveryWithTopologyChange(2);
+	}
+
+
 	/**
 	 * Tests the checkpoint restoration with changing parallelism of job vertex with partitioned
 	 * state.
@@ -2530,6 +2546,282 @@ public class CheckpointCoordinatorTest {
 		comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw);
 	}
 
+	private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
+		JobVertexID jobVertexID = new JobVertexID();
+		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
+		return new Tuple2<>(jobVertexID, operatorID);
+	}
+	
+	/**
+	 * old topology
+	 * [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2
+	 *
+	 *
+	 * new topology
+	 *
+	 * [operator5,operator1,operator3] * newParallelism1 -> [operator3, operator6] * newParallelism2
+	 *
+	 * scaleType:
+	 * 0  increase parallelism
+	 * 1  decrease parallelism
+	 * 2  same parallelism
+	 */
+	public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception {
+
+		/**
+		 * Old topology
+		 * CHAIN(op1 -> op2) * parallelism1 -> CHAIN(op3 -> op4) * parallelism2
+		 */
+		Tuple2<JobVertexID, OperatorID> id1 = generateIDPair();
+		Tuple2<JobVertexID, OperatorID> id2 = generateIDPair();
+		int parallelism1 = 10;
+		int maxParallelism1 = 64;
+
+		Tuple2<JobVertexID, OperatorID> id3 = generateIDPair();
+		Tuple2<JobVertexID, OperatorID> id4 = generateIDPair();
+		int parallelism2 = 10;
+		int maxParallelism2 = 64;
+
+		List<KeyGroupRange> keyGroupPartitions2 =
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+
+		Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+
+		//prepare vertex1 state
+		for (Tuple2<JobVertexID, OperatorID> id : Lists.newArrayList(id1, id2)) {
+			OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1);
+			operatorStates.put(id.f1, taskState);
+			for (int index = 0; index < taskState.getParallelism(); index++) {
+				StreamStateHandle subNonPartitionedState = 
+					generateStateForVertex(id.f0, index)
+						.get(0);
+				OperatorStateHandle subManagedOperatorState =
+					generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false)
+						.get(0);
+				OperatorStateHandle subRawOperatorState =
+					generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true)
+						.get(0);
+
+				OperatorSubtaskState subtaskState = new OperatorSubtaskState(subNonPartitionedState,
+					subManagedOperatorState,
+					subRawOperatorState,
+					null,
+					null);
+				taskState.putState(index, subtaskState);
+			}
+		}
+
+		List<List<ChainedStateHandle<OperatorStateHandle>>> expectedManagedOperatorStates = new ArrayList<>();
+		List<List<ChainedStateHandle<OperatorStateHandle>>> expectedRawOperatorStates = new ArrayList<>();
+		//prepare vertex2 state
+		for (Tuple2<JobVertexID, OperatorID> id : Lists.newArrayList(id3, id4)) {
+			OperatorState operatorState = new OperatorState(id.f1, parallelism2, maxParallelism2);
+			operatorStates.put(id.f1, operatorState);
+			List<ChainedStateHandle<OperatorStateHandle>> expectedManagedOperatorState = new ArrayList<>();
+			List<ChainedStateHandle<OperatorStateHandle>> expectedRawOperatorState = new ArrayList<>();
+			expectedManagedOperatorStates.add(expectedManagedOperatorState);
+			expectedRawOperatorStates.add(expectedRawOperatorState);
+
+			for (int index = 0; index < operatorState.getParallelism(); index++) {
+				OperatorStateHandle subManagedOperatorState =
+					generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false)
+						.get(0);
+				OperatorStateHandle subRawOperatorState =
+					generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true)
+						.get(0);
+				KeyGroupsStateHandle subManagedKeyedState = id.f0.equals(id3.f0)
+					? generateKeyGroupState(id.f0, keyGroupPartitions2.get(index), false)
+					: null;
+				KeyGroupsStateHandle subRawKeyedState = id.f0.equals(id3.f0)
+					? generateKeyGroupState(id.f0, keyGroupPartitions2.get(index), true)
+					: null;
+
+				expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle(subManagedOperatorState));
+				expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle(subRawOperatorState));
+
+				OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+					null,
+					subManagedOperatorState,
+					subRawOperatorState,
+					subManagedKeyedState,
+					subRawKeyedState);
+				operatorState.putState(index, subtaskState);
+			}
+		}
+
+		/**
+		 * New topology
+		 * CHAIN(op5 -> op1 -> op2) * newParallelism1 -> CHAIN(op3 -> op6) * newParallelism2
+		 */
+		Tuple2<JobVertexID, OperatorID> id5 = generateIDPair();
+		int newParallelism1 = 10;
+
+		Tuple2<JobVertexID, OperatorID> id6 = generateIDPair();
+		int newParallelism2 = parallelism2;
+
+		if (scaleType == 0) {
+			newParallelism2 = 20;
+		} else if (scaleType == 1) {
+			newParallelism2 = 8;
+		}
+
+		List<KeyGroupRange> newKeyGroupPartitions2 =
+			StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+
+		final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
+			id5.f0,
+			Lists.newArrayList(id2.f1, id1.f1, id5.f1),
+			newParallelism1,
+			maxParallelism1);
+
+		final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
+			id3.f0,
+			Lists.newArrayList(id6.f1, id3.f1),
+			newParallelism2,
+			maxParallelism2);
+
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+
+		tasks.put(id5.f0, newJobVertex1);
+		tasks.put(id3.f0, newJobVertex2);
+
+		JobID jobID = new JobID();
+		StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore =
+			spy(new StandaloneCompletedCheckpointStore(1));
+
+		CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(
+				jobID,
+				2,
+				System.currentTimeMillis(),
+				System.currentTimeMillis() + 3000,
+				operatorStates,
+				Collections.<MasterState>emptyList(),
+				CheckpointProperties.forStandardCheckpoint(),
+				null,
+				null);
+
+		when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			new JobID(),
+			600000,
+			600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			newJobVertex1.getTaskVertices(),
+			newJobVertex1.getTaskVertices(),
+			newJobVertex1.getTaskVertices(),
+			new StandaloneCheckpointIDCounter(),
+			standaloneCompletedCheckpointStore,
+			null,
+			Executors.directExecutor());
+
+		coord.restoreLatestCheckpointedState(tasks, false, true);
+
+		for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
+
+			TaskStateHandles taskStateHandles = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles();
+			ChainedStateHandle<StreamStateHandle> actualSubNonPartitionedState = taskStateHandles.getLegacyOperatorState();
+			List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = taskStateHandles.getManagedOperatorState();
+			List<Collection<OperatorStateHandle>> actualSubRawOperatorState = taskStateHandles.getRawOperatorState();
+
+			assertNull(taskStateHandles.getManagedKeyedState());
+			assertNull(taskStateHandles.getRawKeyedState());
+
+			// operator5
+			{
+				int operatorIndexInChain = 2;
+				assertNull(actualSubNonPartitionedState.get(operatorIndexInChain));
+				assertNull(actualSubManagedOperatorState.get(operatorIndexInChain));
+				assertNull(actualSubRawOperatorState.get(operatorIndexInChain));
+			}
+			// operator1
+			{
+				int operatorIndexInChain = 1;
+				ChainedStateHandle<StreamStateHandle> expectSubNonPartitionedState = generateStateForVertex(id1.f0, i);
+				ChainedStateHandle<OperatorStateHandle> expectedManagedOpState = generateChainedPartitionableStateHandle(
+					id1.f0, i, 2, 8, false);
+				ChainedStateHandle<OperatorStateHandle> expectedRawOpState = generateChainedPartitionableStateHandle(
+					id1.f0, i, 2, 8, true);
+
+				assertTrue(CommonTestUtils.isSteamContentEqual(
+					expectSubNonPartitionedState.get(0).openInputStream(),
+					actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream()));
+
+				assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(),
+					actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+
+				assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(),
+					actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+			}
+			// operator2
+			{
+				int operatorIndexInChain = 0;
+				ChainedStateHandle<StreamStateHandle> expectSubNonPartitionedState = generateStateForVertex(id2.f0, i);
+				ChainedStateHandle<OperatorStateHandle> expectedManagedOpState = generateChainedPartitionableStateHandle(
+					id2.f0, i, 2, 8, false);
+				ChainedStateHandle<OperatorStateHandle> expectedRawOpState = generateChainedPartitionableStateHandle(
+					id2.f0, i, 2, 8, true);
+
+				assertTrue(CommonTestUtils.isSteamContentEqual(expectSubNonPartitionedState.get(0).openInputStream(),
+					actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream()));
+
+				assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(),
+					actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+
+				assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(),
+					actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+			}
+		}
+
+		List<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<>(newJobVertex2.getParallelism());
+		List<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<>(newJobVertex2.getParallelism());
+
+		for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
+			TaskStateHandles taskStateHandles = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles();
+
+			// operator 3
+			{
+				int operatorIndexInChain = 1;
+				List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = new ArrayList<>(1);
+				actualSubManagedOperatorState.add(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain));
+
+				List<Collection<OperatorStateHandle>> actualSubRawOperatorState = new ArrayList<>(1);
+				actualSubRawOperatorState.add(taskStateHandles.getRawOperatorState().get(operatorIndexInChain));
+
+				actualManagedOperatorStates.add(actualSubManagedOperatorState);
+				actualRawOperatorStates.add(actualSubRawOperatorState);
+
+				assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain));
+			}
+
+			// operator 6
+			{
+				int operatorIndexInChain = 0;
+				assertNull(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain));
+				assertNull(taskStateHandles.getRawOperatorState().get(operatorIndexInChain));
+				assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain));
+
+			}
+
+			KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), false);
+			KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true);
+
+
+			Collection<KeyedStateHandle> keyedStateBackend = taskStateHandles.getManagedKeyedState();
+			Collection<KeyedStateHandle> keyGroupStateRaw = taskStateHandles.getRawKeyedState();
+
+
+			compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
+			compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
+		}
+
+		comparePartitionableState(expectedManagedOperatorStates.get(0), actualManagedOperatorStates);
+		comparePartitionableState(expectedRawOperatorStates.get(0), actualRawOperatorStates);
+	}
+
 	/**
 	 * Tests that the externalized checkpoint configuration is respected.
 	 */