You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/03/05 14:52:29 UTC
[flink] 04/06: [hotfix][checkpointing] Minor code cleanups for
'checkpoint.metadata' test classes
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit add072264a4415be774099192408c13cfbfca320
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 27 15:39:26 2020 +0100
[hotfix][checkpointing] Minor code cleanups for 'checkpoint.metadata' test classes
- fix checkstyle
- Adjust names of tests with renamed main scope classes
- remove test util methods that are no longer used after Legacy State was dropped.
---
...dataV2Test.java => CheckpointMetadataTest.java} | 10 +-
.../checkpoint/metadata/CheckpointTestUtils.java | 101 +--------------------
.../metadata/MetadataV3SerializerTest.java | 4 +-
3 files changed, 10 insertions(+), 105 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
similarity index 92%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Test.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
index ff93d5d..f4cc7d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointMetadataTest.java
@@ -30,13 +30,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-public class MetadataV2Test {
+/**
+ * Simple tests for the {@link CheckpointMetadata} data holder class.
+ */
+public class CheckpointMetadataTest {
- /**
- * Simple test of savepoint methods.
- */
@Test
- public void testSavepointV2() throws Exception {
+ public void testConstructAndDispose() throws Exception {
final Random rnd = new Random();
final long checkpointId = rnd.nextInt(Integer.MAX_VALUE) + 1;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index b526d05..f411a39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -22,11 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -59,13 +55,6 @@ public class CheckpointTestUtils {
/**
* Creates a random collection of OperatorState objects containing various types of state handles.
*/
- public static Collection<OperatorState> createOperatorStates(int numTaskStates, int numSubtasksPerTask) {
- return createOperatorStates(new Random(), numTaskStates, numSubtasksPerTask);
- }
-
- /**
- * Creates a random collection of OperatorState objects containing various types of state handles.
- */
public static Collection<OperatorState> createOperatorStates(
Random random,
int numTaskStates,
@@ -93,7 +82,7 @@ public class CheckpointTestUtils {
OperatorStateHandle operatorStateHandleBackend = null;
OperatorStateHandle operatorStateHandleStream = null;
-
+
Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
@@ -136,89 +125,6 @@ public class CheckpointTestUtils {
}
/**
- * Creates a random collection of TaskState objects containing various types of state handles.
- */
- public static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtasksPerTask) {
- return createTaskStates(new Random(), numTaskStates, numSubtasksPerTask);
- }
-
- /**
- * Creates a random collection of TaskState objects containing various types of state handles.
- */
- public static Collection<TaskState> createTaskStates(
- Random random,
- int numTaskStates,
- int numSubtasksPerTask) {
-
- List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int stateIdx = 0; stateIdx < numTaskStates; ++stateIdx) {
-
- int chainLength = 1 + random.nextInt(8);
-
- TaskState taskState = new TaskState(new JobVertexID(), numSubtasksPerTask, 128, chainLength);
-
- int noNonPartitionableStateAtIndex = random.nextInt(chainLength);
- int noOperatorStateBackendAtIndex = random.nextInt(chainLength);
- int noOperatorStateStreamAtIndex = random.nextInt(chainLength);
-
- boolean hasKeyedBackend = random.nextInt(4) != 0;
- boolean hasKeyedStream = random.nextInt(4) != 0;
-
- for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) {
-
- List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(chainLength);
- List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(chainLength);
-
- for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
-
- StreamStateHandle operatorStateBackend =
- new ByteStreamStateHandle("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
- StreamStateHandle operatorStateStream =
- new ByteStreamStateHandle("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
- Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
- offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
- offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
- offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.UNION));
-
- if (chainIdx != noOperatorStateBackendAtIndex) {
- OperatorStateHandle operatorStateHandleBackend =
- new OperatorStreamStateHandle(offsetsMap, operatorStateBackend);
- operatorStatesBackend.add(operatorStateHandleBackend);
- }
-
- if (chainIdx != noOperatorStateStreamAtIndex) {
- OperatorStateHandle operatorStateHandleStream =
- new OperatorStreamStateHandle(offsetsMap, operatorStateStream);
- operatorStatesStream.add(operatorStateHandleStream);
- }
- }
-
- KeyGroupsStateHandle keyedStateBackend = null;
- KeyGroupsStateHandle keyedStateStream = null;
-
- if (hasKeyedBackend) {
- keyedStateBackend = createDummyKeyGroupStateHandle(random);
- }
-
- if (hasKeyedStream) {
- keyedStateStream = createDummyKeyGroupStateHandle(random);
- }
-
- taskState.putState(subtaskIdx, new SubtaskState(
- new ChainedStateHandle<>(operatorStatesBackend),
- new ChainedStateHandle<>(operatorStatesStream),
- keyedStateStream,
- keyedStateBackend));
- }
-
- taskStates.add(taskState);
- }
-
- return taskStates;
- }
-
- /**
* Creates a bunch of random master states.
*/
public static Collection<MasterState> createRandomMasterStates(Random random, int num) {
@@ -238,7 +144,7 @@ public class CheckpointTestUtils {
/**
* Asserts that two MasterStates are equal.
- *
+ *
* <p>The MasterState avoids overriding {@code equals()} on purpose, because equality is not well
* defined in the raw contents.
*/
@@ -251,10 +157,9 @@ public class CheckpointTestUtils {
// ------------------------------------------------------------------------
- /** utility class, not meant to be instantiated */
+ /** utility class, not meant to be instantiated. */
private CheckpointTestUtils() {}
-
public static IncrementalRemoteKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
return new IncrementalRemoteKeyedStateHandle(
createRandomUUID(rnd),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
index 0147e54..9e4456c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3SerializerTest.java
@@ -38,7 +38,7 @@ import java.util.Random;
import static org.junit.Assert.assertEquals;
/**
- * Various tests for the version 2 format serializer of a checkpoint.
+ * Various tests for the version 3 format serializer of a checkpoint.
*/
public class MetadataV3SerializerTest {
@@ -127,7 +127,7 @@ public class MetadataV3SerializerTest {
ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
DataOutputStream out = new DataOutputViewStreamWrapper(baos);
- serializer.serialize(new CheckpointMetadata(checkpointId, operatorStates, masterStates), out);
+ MetadataV3Serializer.serialize(new CheckpointMetadata(checkpointId, operatorStates, masterStates), out);
out.close();
byte[] bytes = baos.toByteArray();