You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/03/05 14:52:29 UTC

[flink] 04/06: [hotfix][checkpointing] Minor code cleanups for 'checkpoint.metadata' test classes

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit add072264a4415be774099192408c13cfbfca320
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 27 15:39:26 2020 +0100

    [hotfix][checkpointing] Minor code cleanups for 'checkpoint.metadata' test classes
    
      - fix checkstyle
      - Adjust names of tests with renamed main scope classes
      - remove test util methods that are no longer used after Legacy State was dropped.
---
 ...dataV2Test.java => CheckpointMetadataTest.java} |  10 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   | 101 +--------------------
 .../metadata/MetadataV3SerializerTest.java         |   4 +-
 3 files changed, 10 insertions(+), 105 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
similarity index 92%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Test.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
index ff93d5d..f4cc7d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
@@ -30,13 +30,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-public class MetadataV2Test {
+/**
+ * Simple tests for the {@link CheckpointMetadata} data holder class.
+ */
+public class CheckpointMetadataTest {
 
-	/**
-	 * Simple test of savepoint methods.
-	 */
 	@Test
-	public void testSavepointV2() throws Exception {
+	public void testConstructAndDispose() throws Exception {
 		final Random rnd = new Random();
 
 		final long checkpointId = rnd.nextInt(Integer.MAX_VALUE) + 1;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index b526d05..f411a39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -22,11 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -59,13 +55,6 @@ public class CheckpointTestUtils {
 	/**
 	 * Creates a random collection of OperatorState objects containing various types of state handles.
 	 */
-	public static Collection<OperatorState> createOperatorStates(int numTaskStates, int numSubtasksPerTask) {
-		return createOperatorStates(new Random(), numTaskStates, numSubtasksPerTask);
-	}
-
-	/**
-	 * Creates a random collection of OperatorState objects containing various types of state handles.
-	 */
 	public static Collection<OperatorState> createOperatorStates(
 			Random random,
 			int numTaskStates,
@@ -93,7 +82,7 @@ public class CheckpointTestUtils {
 
 				OperatorStateHandle operatorStateHandleBackend = null;
 				OperatorStateHandle operatorStateHandleStream = null;
-				
+
 				Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
 				offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
 				offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
@@ -136,89 +125,6 @@ public class CheckpointTestUtils {
 	}
 
 	/**
-	 * Creates a random collection of TaskState objects containing various types of state handles.
-	 */
-	public static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtasksPerTask) {
-		return createTaskStates(new Random(), numTaskStates, numSubtasksPerTask);
-	}
-
-	/**
-	 * Creates a random collection of TaskState objects containing various types of state handles.
-	 */
-	public static Collection<TaskState> createTaskStates(
-			Random random,
-			int numTaskStates,
-			int numSubtasksPerTask) {
-
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int stateIdx = 0; stateIdx < numTaskStates; ++stateIdx) {
-
-			int chainLength = 1 + random.nextInt(8);
-
-			TaskState taskState = new TaskState(new JobVertexID(), numSubtasksPerTask, 128, chainLength);
-
-			int noNonPartitionableStateAtIndex = random.nextInt(chainLength);
-			int noOperatorStateBackendAtIndex = random.nextInt(chainLength);
-			int noOperatorStateStreamAtIndex = random.nextInt(chainLength);
-
-			boolean hasKeyedBackend = random.nextInt(4) != 0;
-			boolean hasKeyedStream = random.nextInt(4) != 0;
-
-			for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) {
-
-				List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(chainLength);
-				List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(chainLength);
-
-				for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
-
-					StreamStateHandle operatorStateBackend =
-							new ByteStreamStateHandle("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
-					StreamStateHandle operatorStateStream =
-							new ByteStreamStateHandle("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
-					Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
-					offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-					offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-					offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.UNION));
-
-					if (chainIdx != noOperatorStateBackendAtIndex) {
-						OperatorStateHandle operatorStateHandleBackend =
-								new OperatorStreamStateHandle(offsetsMap, operatorStateBackend);
-						operatorStatesBackend.add(operatorStateHandleBackend);
-					}
-
-					if (chainIdx != noOperatorStateStreamAtIndex) {
-						OperatorStateHandle operatorStateHandleStream =
-								new OperatorStreamStateHandle(offsetsMap, operatorStateStream);
-						operatorStatesStream.add(operatorStateHandleStream);
-					}
-				}
-
-				KeyGroupsStateHandle keyedStateBackend = null;
-				KeyGroupsStateHandle keyedStateStream = null;
-
-				if (hasKeyedBackend) {
-					keyedStateBackend = createDummyKeyGroupStateHandle(random);
-				}
-
-				if (hasKeyedStream) {
-					keyedStateStream = createDummyKeyGroupStateHandle(random);
-				}
-
-				taskState.putState(subtaskIdx, new SubtaskState(
-						new ChainedStateHandle<>(operatorStatesBackend),
-						new ChainedStateHandle<>(operatorStatesStream),
-						keyedStateStream,
-						keyedStateBackend));
-			}
-
-			taskStates.add(taskState);
-		}
-
-		return taskStates;
-	}
-
-	/**
 	 * Creates a bunch of random master states.
 	 */
 	public static Collection<MasterState> createRandomMasterStates(Random random, int num) {
@@ -238,7 +144,7 @@ public class CheckpointTestUtils {
 
 	/**
 	 * Asserts that two MasterStates are equal.
-	 * 
+	 *
 	 * <p>The MasterState avoids overriding {@code equals()} on purpose, because equality is not well
 	 * defined in the raw contents.
 	 */
@@ -251,10 +157,9 @@ public class CheckpointTestUtils {
 
 	// ------------------------------------------------------------------------
 
-	/** utility class, not meant to be instantiated */
+	/** utility class, not meant to be instantiated. */
 	private CheckpointTestUtils() {}
 
-
 	public static IncrementalRemoteKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
 		return new IncrementalRemoteKeyedStateHandle(
 			createRandomUUID(rnd),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
index 0147e54..9e4456c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
@@ -38,7 +38,7 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Various tests for the version 2 format serializer of a checkpoint.
+ * Various tests for the version 3 format serializer of a checkpoint.
  */
 public class MetadataV3SerializerTest {
 
@@ -127,7 +127,7 @@ public class MetadataV3SerializerTest {
 		ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
 		DataOutputStream out = new DataOutputViewStreamWrapper(baos);
 
-		serializer.serialize(new CheckpointMetadata(checkpointId, operatorStates, masterStates), out);
+		MetadataV3Serializer.serialize(new CheckpointMetadata(checkpointId, operatorStates, masterStates), out);
 		out.close();
 
 		byte[] bytes = baos.toByteArray();