You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/25 22:55:41 UTC

[flink] 02/07: [FLINK-16178][refactor] Remove SavepointV1 class and move relevant code to test scope

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ae98ffe13cb6f90024422a42cdc3303846cd96f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 20 12:10:43 2020 +0100

    [FLINK-16178][refactor] Remove SavepointV1 class and move relevant code to test scope
    
    The class was a shell used for tests but confusing and cluttering the main scope.
---
 .../checkpoint/savepoint/SavepointSerializer.java  |   9 +-
 .../checkpoint/savepoint/SavepointSerializers.java |   3 +-
 .../runtime/checkpoint/savepoint/SavepointV1.java  |  87 -------
 .../savepoint/SavepointV1Serializer.java           | 266 ++++++++++-----------
 .../savepoint/SavepointV1SerializerTest.java       |  32 +--
 5 files changed, 151 insertions(+), 246 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 71dee6b..6b0113f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -18,20 +18,15 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.flink.runtime.checkpoint.Checkpoints;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
 /**
- * Serializer for {@link Savepoint} instances.
- *
- * <p>This serializer is used to read/write a savepoint via {@link Checkpoints}.
+ * Deserializer for checkpoint metadata. Different deserializers exist to deserialize from different
+ * format versions.
  *
  * <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
- *
- * @param <T> Savepoint type to serialize.
  */
 public interface SavepointSerializer<T extends Savepoint> {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 25b63b6..300dee2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -35,7 +35,7 @@ public class SavepointSerializers {
 	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
 
 	static {
-		SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
+		SERIALIZERS.put(SavepointV1Serializer.VERSION, SavepointV1Serializer.INSTANCE);
 		SERIALIZERS.put(SavepointV2.VERSION, SavepointV2Serializer.INSTANCE);
 	}
 
@@ -71,7 +71,6 @@ public class SavepointSerializers {
 	 * @return Savepoint for the given version
 	 * @throws IllegalArgumentException If unknown savepoint version
 	 */
-	@SuppressWarnings("unchecked")
 	public static SavepointSerializer<?> getSerializer(int version) {
 		SavepointSerializer<?> serializer = SERIALIZERS.get(version);
 		if (serializer != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
deleted file mode 100644
index 69e7695..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-
-/**
- * Savepoint version 0.
- *
- * <p>This format was introduced with Flink 1.1.0.
- */
-public class SavepointV1 implements Savepoint {
-
-	/** The savepoint version. */
-	public static final int VERSION = 1;
-
-	/** The checkpoint ID. */
-	private final long checkpointId;
-
-	/** The task states. */
-	private final Collection<TaskState> taskStates;
-
-	public SavepointV1(long checkpointId, Collection<TaskState> taskStates) {
-		this.checkpointId = checkpointId;
-		this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-
-	@Override
-	public long getCheckpointId() {
-		return checkpointId;
-	}
-
-	@Override
-	public Collection<TaskState> getTaskStates() {
-		return taskStates;
-	}
-
-	@Override
-	public Collection<MasterState> getMasterStates() {
-		// since checkpoints are never deserialized into this format,
-		// this method should never be called
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Collection<OperatorState> getOperatorStates() {
-		return null;
-	}
-
-	@Override
-	public void dispose() throws Exception {
-		// since checkpoints are never deserialized into this format,
-		// this method should never be called
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public String toString() {
-		return "Savepoint(version=" + VERSION + ")";
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index e56d4be..0bc7551 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -56,6 +56,9 @@ import java.util.Map;
 @SuppressWarnings("deprecation")
 public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 
+	/** The savepoint version. */
+	public static final int VERSION = 1;
+
 	private static final byte NULL_HANDLE = 0;
 	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
 	private static final byte FILE_STREAM_STATE_HANDLE = 2;
@@ -103,66 +106,6 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		return new SavepointV2(checkpointId, taskStates);
 	}
 
-	public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
-		dos.writeLong(savepoint.getCheckpointId());
-
-		Collection<TaskState> taskStates = savepoint.getTaskStates();
-		dos.writeInt(taskStates.size());
-
-		for (TaskState taskState : savepoint.getTaskStates()) {
-			// Vertex ID
-			dos.writeLong(taskState.getJobVertexID().getLowerPart());
-			dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
-			// Parallelism
-			int parallelism = taskState.getParallelism();
-			dos.writeInt(parallelism);
-			dos.writeInt(taskState.getMaxParallelism());
-			dos.writeInt(taskState.getChainLength());
-
-			// Sub task states
-			Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
-			dos.writeInt(subtaskStateMap.size());
-			for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
-				dos.writeInt(entry.getKey());
-				serializeSubtaskState(entry.getValue(), dos);
-			}
-		}
-	}
-
-	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
-
-		//backwards compatibility, do not remove
-		dos.writeLong(-1L);
-
-		//backwards compatibility (number of legacy state handles), do not remove
-		dos.writeInt(0);
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
-
-		int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
-		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle stateHandle = operatorStateBackend.get(i);
-			serializeOperatorStateHandle(stateHandle, dos);
-		}
-
-		ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
-
-		len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
-		dos.writeInt(len);
-		for (int i = 0; i < len; ++i) {
-			OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
-			serializeOperatorStateHandle(stateHandle, dos);
-		}
-
-		KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
-		serializeKeyedStateHandle(keyedStateBackend, dos);
-
-		KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
-		serializeKeyedStateHandle(keyedStateStream, dos);
-	}
-
 	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
 		// Duration field has been removed from SubtaskState
 		dis.readLong();
@@ -213,29 +156,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 				keyedStateStream);
 	}
 
-	@VisibleForTesting
-	public static void serializeKeyedStateHandle(
-			KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
-		if (stateHandle == null) {
-			dos.writeByte(NULL_HANDLE);
-		} else if (stateHandle instanceof KeyGroupsStateHandle) {
-			KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
-
-			dos.writeByte(KEY_GROUPS_HANDLE);
-			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
-			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-			for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
-				dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
-			}
-			serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
-		} else {
-			throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
-		}
-	}
-
-	@VisibleForTesting
-	public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+	private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -256,39 +177,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		}
 	}
 
-	@VisibleForTesting
-	public static void serializeOperatorStateHandle(
-		OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
-		if (stateHandle != null) {
-			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
-			Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
-					stateHandle.getStateNameToPartitionOffsets();
-			dos.writeInt(partitionOffsetsMap.size());
-			for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
-				dos.writeUTF(entry.getKey());
-
-				OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
-
-				int mode = stateMetaInfo.getDistributionMode().ordinal();
-				dos.writeByte(mode);
-
-				long[] offsets = stateMetaInfo.getOffsets();
-				dos.writeInt(offsets.length);
-				for (long offset : offsets) {
-					dos.writeLong(offset);
-				}
-			}
-			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
-		} else {
-			dos.writeByte(NULL_HANDLE);
-		}
-	}
-
-	@VisibleForTesting
-	public static OperatorStateHandle deserializeOperatorStateHandle(
-			DataInputStream dis) throws IOException {
-
+	private static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -317,9 +206,90 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		}
 	}
 
+	private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+		int type = dis.read();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else if (FILE_STREAM_STATE_HANDLE == type) {
+			long size = dis.readLong();
+			String pathString = dis.readUTF();
+			return new FileStateHandle(new Path(pathString), size);
+		} else if (BYTE_STREAM_STATE_HANDLE == type) {
+			String handleName = dis.readUTF();
+			int numBytes = dis.readInt();
+			byte[] data = new byte[numBytes];
+			dis.readFully(data);
+			return new ByteStreamStateHandle(handleName, data);
+		} else {
+			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  old format serialization, for testing
+	// ------------------------------------------------------------------------
+
 	@VisibleForTesting
-	public static void serializeStreamStateHandle(
-			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+	public static void serializeVersion1(long checkpointId, Collection<TaskState> taskStates, DataOutputStream dos) throws IOException {
+		dos.writeLong(checkpointId);
+
+		dos.writeInt(taskStates.size());
+
+		for (TaskState taskState : taskStates) {
+			// Vertex ID
+			dos.writeLong(taskState.getJobVertexID().getLowerPart());
+			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+			// Parallelism
+			int parallelism = taskState.getParallelism();
+			dos.writeInt(parallelism);
+			dos.writeInt(taskState.getMaxParallelism());
+			dos.writeInt(taskState.getChainLength());
+
+			// Sub task states
+			Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+			dos.writeInt(subtaskStateMap.size());
+			for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+				dos.writeInt(entry.getKey());
+				serializeSubtaskState(entry.getValue(), dos);
+			}
+		}
+	}
+
+	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+		//backwards compatibility, do not remove
+		dos.writeLong(-1L);
+
+		//backwards compatibility (number of legacy state handles), do not remove
+		dos.writeInt(0);
+
+		ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
+
+		int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
+		dos.writeInt(len);
+		for (int i = 0; i < len; ++i) {
+			OperatorStateHandle stateHandle = operatorStateBackend.get(i);
+			serializeOperatorStateHandle(stateHandle, dos);
+		}
+
+		ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
+
+		len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
+		dos.writeInt(len);
+		for (int i = 0; i < len; ++i) {
+			OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
+			serializeOperatorStateHandle(stateHandle, dos);
+		}
+
+		KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+		serializeKeyedStateHandle(keyedStateBackend, dos);
+
+		KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+		serializeKeyedStateHandle(keyedStateStream, dos);
+	}
+
+	private static void serializeStreamStateHandle(
+		StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle == null) {
 			dos.writeByte(NULL_HANDLE);
@@ -345,23 +315,51 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		dos.flush();
 	}
 
-	@VisibleForTesting
-	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
-		int type = dis.read();
-		if (NULL_HANDLE == type) {
-			return null;
-		} else if (FILE_STREAM_STATE_HANDLE == type) {
-			long size = dis.readLong();
-			String pathString = dis.readUTF();
-			return new FileStateHandle(new Path(pathString), size);
-		} else if (BYTE_STREAM_STATE_HANDLE == type) {
-			String handleName = dis.readUTF();
-			int numBytes = dis.readInt();
-			byte[] data = new byte[numBytes];
-			dis.readFully(data);
-			return new ByteStreamStateHandle(handleName, data);
+	private static void serializeKeyedStateHandle(
+		KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle == null) {
+			dos.writeByte(NULL_HANDLE);
+		} else if (stateHandle instanceof KeyGroupsStateHandle) {
+			KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
+
+			dos.writeByte(KEY_GROUPS_HANDLE);
+			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
+			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+			for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
+				dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
+			}
+			serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
 		} else {
-			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+			throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
+		}
+	}
+
+	private static void serializeOperatorStateHandle(
+		OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle != null) {
+			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+			Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
+				stateHandle.getStateNameToPartitionOffsets();
+			dos.writeInt(partitionOffsetsMap.size());
+			for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
+				dos.writeUTF(entry.getKey());
+
+				OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
+
+				int mode = stateMetaInfo.getDistributionMode().ordinal();
+				dos.writeByte(mode);
+
+				long[] offsets = stateMetaInfo.getOffsets();
+				dos.writeInt(offsets.length);
+				for (long offset : offsets) {
+					dos.writeLong(offset);
+				}
+			}
+			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+		} else {
+			dos.writeByte(NULL_HANDLE);
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
index 0eff7bc..da448f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -21,43 +21,43 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.util.Collection;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Test that the Checkpoint Metadata V1 deserializer can deserialize a the metadata correctly
+ * into the latest format.
+ */
 public class SavepointV1SerializerTest {
 
-	/**
-	 * Test serialization of {@link SavepointV1} instance.
-	 */
 	@Test
 	public void testSerializeDeserializeV1() throws Exception {
 		final Random r = new Random(42);
 
 		for (int i = 0; i < 50; ++i) {
-			SavepointV1 expected =
-					new SavepointV1(i+ 123123, CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64)));
-
-			SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
+			final long checkpointId = i + 123123;
+			final Collection<TaskState> taskStates = CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64));
 
 			// Serialize
-			ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
-			serializer.serializeOld(expected, new DataOutputViewStreamWrapper(baos));
-			byte[] bytes = baos.toByteArray();
+			final ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
+			SavepointV1Serializer.serializeVersion1(checkpointId, taskStates, new DataOutputViewStreamWrapper(baos));
+			final byte[] bytes = baos.toByteArray();
 
 			// Deserialize
-			ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-			SavepointV2 actual = serializer.deserialize(
-					new DataInputViewStreamWrapper(bais),
+			final SavepointV2 actual = SavepointV1Serializer.INSTANCE.deserialize(
+					new DataInputViewStreamWrapper(new ByteArrayInputStream(bytes)),
 					Thread.currentThread().getContextClassLoader());
 
-
-			assertEquals(expected.getCheckpointId(), actual.getCheckpointId());
-			assertEquals(expected.getTaskStates(), actual.getTaskStates());
+			assertEquals(checkpointId, actual.getCheckpointId());
+			assertEquals(taskStates, actual.getTaskStates());
 			assertTrue(actual.getMasterStates().isEmpty());
 		}
 	}