You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/25 22:55:41 UTC
[flink] 02/07: [FLINK-16178][refactor] Remove SavepointV1 class and
move relevant code to test scope
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ae98ffe13cb6f90024422a42cdc3303846cd96f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 20 12:10:43 2020 +0100
[FLINK-16178][refactor] Remove SavepointV1 class and move relevant code to test scope
The class was a shell used for tests but confusing and cluttering the main scope.
---
.../checkpoint/savepoint/SavepointSerializer.java | 9 +-
.../checkpoint/savepoint/SavepointSerializers.java | 3 +-
.../runtime/checkpoint/savepoint/SavepointV1.java | 87 -------
.../savepoint/SavepointV1Serializer.java | 266 ++++++++++-----------
.../savepoint/SavepointV1SerializerTest.java | 32 +--
5 files changed, 151 insertions(+), 246 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 71dee6b..6b0113f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -18,20 +18,15 @@
package org.apache.flink.runtime.checkpoint.savepoint;
-import org.apache.flink.runtime.checkpoint.Checkpoints;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
- * Serializer for {@link Savepoint} instances.
- *
- * <p>This serializer is used to read/write a savepoint via {@link Checkpoints}.
+ * Deserializer for checkpoint metadata. Different deserializers exist to deserialize from different
+ * format versions.
*
* <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
- *
- * @param <T> Savepoint type to serialize.
*/
public interface SavepointSerializer<T extends Savepoint> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 25b63b6..300dee2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -35,7 +35,7 @@ public class SavepointSerializers {
private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
static {
- SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
+ SERIALIZERS.put(SavepointV1Serializer.VERSION, SavepointV1Serializer.INSTANCE);
SERIALIZERS.put(SavepointV2.VERSION, SavepointV2Serializer.INSTANCE);
}
@@ -71,7 +71,6 @@ public class SavepointSerializers {
* @return Savepoint for the given version
* @throws IllegalArgumentException If unknown savepoint version
*/
- @SuppressWarnings("unchecked")
public static SavepointSerializer<?> getSerializer(int version) {
SavepointSerializer<?> serializer = SERIALIZERS.get(version);
if (serializer != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
deleted file mode 100644
index 69e7695..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-
-/**
- * Savepoint version 0.
- *
- * <p>This format was introduced with Flink 1.1.0.
- */
-public class SavepointV1 implements Savepoint {
-
- /** The savepoint version. */
- public static final int VERSION = 1;
-
- /** The checkpoint ID. */
- private final long checkpointId;
-
- /** The task states. */
- private final Collection<TaskState> taskStates;
-
- public SavepointV1(long checkpointId, Collection<TaskState> taskStates) {
- this.checkpointId = checkpointId;
- this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- @Override
- public long getCheckpointId() {
- return checkpointId;
- }
-
- @Override
- public Collection<TaskState> getTaskStates() {
- return taskStates;
- }
-
- @Override
- public Collection<MasterState> getMasterStates() {
- // since checkpoints are never deserialized into this format,
- // this method should never be called
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<OperatorState> getOperatorStates() {
- return null;
- }
-
- @Override
- public void dispose() throws Exception {
- // since checkpoints are never deserialized into this format,
- // this method should never be called
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String toString() {
- return "Savepoint(version=" + VERSION + ")";
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index e56d4be..0bc7551 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -56,6 +56,9 @@ import java.util.Map;
@SuppressWarnings("deprecation")
public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
+ /** The savepoint version. */
+ public static final int VERSION = 1;
+
private static final byte NULL_HANDLE = 0;
private static final byte BYTE_STREAM_STATE_HANDLE = 1;
private static final byte FILE_STREAM_STATE_HANDLE = 2;
@@ -103,66 +106,6 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
return new SavepointV2(checkpointId, taskStates);
}
- public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
- dos.writeLong(savepoint.getCheckpointId());
-
- Collection<TaskState> taskStates = savepoint.getTaskStates();
- dos.writeInt(taskStates.size());
-
- for (TaskState taskState : savepoint.getTaskStates()) {
- // Vertex ID
- dos.writeLong(taskState.getJobVertexID().getLowerPart());
- dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
- // Parallelism
- int parallelism = taskState.getParallelism();
- dos.writeInt(parallelism);
- dos.writeInt(taskState.getMaxParallelism());
- dos.writeInt(taskState.getChainLength());
-
- // Sub task states
- Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
- dos.writeInt(subtaskStateMap.size());
- for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
- dos.writeInt(entry.getKey());
- serializeSubtaskState(entry.getValue(), dos);
- }
- }
- }
-
- private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
-
- //backwards compatibility, do not remove
- dos.writeLong(-1L);
-
- //backwards compatibility (number of legacy state handles), do not remove
- dos.writeInt(0);
-
- ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
-
- int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
- dos.writeInt(len);
- for (int i = 0; i < len; ++i) {
- OperatorStateHandle stateHandle = operatorStateBackend.get(i);
- serializeOperatorStateHandle(stateHandle, dos);
- }
-
- ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
-
- len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
- dos.writeInt(len);
- for (int i = 0; i < len; ++i) {
- OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
- serializeOperatorStateHandle(stateHandle, dos);
- }
-
- KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
- serializeKeyedStateHandle(keyedStateBackend, dos);
-
- KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
- serializeKeyedStateHandle(keyedStateStream, dos);
- }
-
private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
// Duration field has been removed from SubtaskState
dis.readLong();
@@ -213,29 +156,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
keyedStateStream);
}
- @VisibleForTesting
- public static void serializeKeyedStateHandle(
- KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
- if (stateHandle == null) {
- dos.writeByte(NULL_HANDLE);
- } else if (stateHandle instanceof KeyGroupsStateHandle) {
- KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
-
- dos.writeByte(KEY_GROUPS_HANDLE);
- dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
- dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
- for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
- dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
- }
- serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
- } else {
- throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
- }
- }
-
- @VisibleForTesting
- public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
@@ -256,39 +177,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
}
}
- @VisibleForTesting
- public static void serializeOperatorStateHandle(
- OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
-
- if (stateHandle != null) {
- dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
- Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
- stateHandle.getStateNameToPartitionOffsets();
- dos.writeInt(partitionOffsetsMap.size());
- for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
- dos.writeUTF(entry.getKey());
-
- OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
-
- int mode = stateMetaInfo.getDistributionMode().ordinal();
- dos.writeByte(mode);
-
- long[] offsets = stateMetaInfo.getOffsets();
- dos.writeInt(offsets.length);
- for (long offset : offsets) {
- dos.writeLong(offset);
- }
- }
- serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
- } else {
- dos.writeByte(NULL_HANDLE);
- }
- }
-
- @VisibleForTesting
- public static OperatorStateHandle deserializeOperatorStateHandle(
- DataInputStream dis) throws IOException {
-
+ private static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
@@ -317,9 +206,90 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
}
}
+ private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ int type = dis.read();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else if (FILE_STREAM_STATE_HANDLE == type) {
+ long size = dis.readLong();
+ String pathString = dis.readUTF();
+ return new FileStateHandle(new Path(pathString), size);
+ } else if (BYTE_STREAM_STATE_HANDLE == type) {
+ String handleName = dis.readUTF();
+ int numBytes = dis.readInt();
+ byte[] data = new byte[numBytes];
+ dis.readFully(data);
+ return new ByteStreamStateHandle(handleName, data);
+ } else {
+ throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // old format serialization, for testing
+ // ------------------------------------------------------------------------
+
@VisibleForTesting
- public static void serializeStreamStateHandle(
- StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+ public static void serializeVersion1(long checkpointId, Collection<TaskState> taskStates, DataOutputStream dos) throws IOException {
+ dos.writeLong(checkpointId);
+
+ dos.writeInt(taskStates.size());
+
+ for (TaskState taskState : taskStates) {
+ // Vertex ID
+ dos.writeLong(taskState.getJobVertexID().getLowerPart());
+ dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+ // Parallelism
+ int parallelism = taskState.getParallelism();
+ dos.writeInt(parallelism);
+ dos.writeInt(taskState.getMaxParallelism());
+ dos.writeInt(taskState.getChainLength());
+
+ // Sub task states
+ Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+ dos.writeInt(subtaskStateMap.size());
+ for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+ dos.writeInt(entry.getKey());
+ serializeSubtaskState(entry.getValue(), dos);
+ }
+ }
+ }
+
+ private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+ //backwards compatibility, do not remove
+ dos.writeLong(-1L);
+
+ //backwards compatibility (number of legacy state handles), do not remove
+ dos.writeInt(0);
+
+ ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
+
+ int len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
+ dos.writeInt(len);
+ for (int i = 0; i < len; ++i) {
+ OperatorStateHandle stateHandle = operatorStateBackend.get(i);
+ serializeOperatorStateHandle(stateHandle, dos);
+ }
+
+ ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
+
+ len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
+ dos.writeInt(len);
+ for (int i = 0; i < len; ++i) {
+ OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
+ serializeOperatorStateHandle(stateHandle, dos);
+ }
+
+ KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+ serializeKeyedStateHandle(keyedStateBackend, dos);
+
+ KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+ serializeKeyedStateHandle(keyedStateStream, dos);
+ }
+
+ private static void serializeStreamStateHandle(
+ StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle == null) {
dos.writeByte(NULL_HANDLE);
@@ -345,23 +315,51 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
dos.flush();
}
- @VisibleForTesting
- public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
- int type = dis.read();
- if (NULL_HANDLE == type) {
- return null;
- } else if (FILE_STREAM_STATE_HANDLE == type) {
- long size = dis.readLong();
- String pathString = dis.readUTF();
- return new FileStateHandle(new Path(pathString), size);
- } else if (BYTE_STREAM_STATE_HANDLE == type) {
- String handleName = dis.readUTF();
- int numBytes = dis.readInt();
- byte[] data = new byte[numBytes];
- dis.readFully(data);
- return new ByteStreamStateHandle(handleName, data);
+ private static void serializeKeyedStateHandle(
+ KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle == null) {
+ dos.writeByte(NULL_HANDLE);
+ } else if (stateHandle instanceof KeyGroupsStateHandle) {
+ KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
+
+ dos.writeByte(KEY_GROUPS_HANDLE);
+ dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
+ dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+ for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
+ dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
+ }
+ serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
} else {
- throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+ throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
+ }
+ }
+
+ private static void serializeOperatorStateHandle(
+ OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle != null) {
+ dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+ Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
+ stateHandle.getStateNameToPartitionOffsets();
+ dos.writeInt(partitionOffsetsMap.size());
+ for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
+ dos.writeUTF(entry.getKey());
+
+ OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
+
+ int mode = stateMetaInfo.getDistributionMode().ordinal();
+ dos.writeByte(mode);
+
+ long[] offsets = stateMetaInfo.getOffsets();
+ dos.writeInt(offsets.length);
+ for (long offset : offsets) {
+ dos.writeLong(offset);
+ }
+ }
+ serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+ } else {
+ dos.writeByte(NULL_HANDLE);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
index 0eff7bc..da448f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -21,43 +21,43 @@ package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.util.Collection;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Test that the Checkpoint Metadata V1 deserializer can deserialize a the metadata correctly
+ * into the latest format.
+ */
public class SavepointV1SerializerTest {
- /**
- * Test serialization of {@link SavepointV1} instance.
- */
@Test
public void testSerializeDeserializeV1() throws Exception {
final Random r = new Random(42);
for (int i = 0; i < 50; ++i) {
- SavepointV1 expected =
- new SavepointV1(i+ 123123, CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64)));
-
- SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
+ final long checkpointId = i + 123123;
+ final Collection<TaskState> taskStates = CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64));
// Serialize
- ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
- serializer.serializeOld(expected, new DataOutputViewStreamWrapper(baos));
- byte[] bytes = baos.toByteArray();
+ final ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
+ SavepointV1Serializer.serializeVersion1(checkpointId, taskStates, new DataOutputViewStreamWrapper(baos));
+ final byte[] bytes = baos.toByteArray();
// Deserialize
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- SavepointV2 actual = serializer.deserialize(
- new DataInputViewStreamWrapper(bais),
+ final SavepointV2 actual = SavepointV1Serializer.INSTANCE.deserialize(
+ new DataInputViewStreamWrapper(new ByteArrayInputStream(bytes)),
Thread.currentThread().getContextClassLoader());
-
- assertEquals(expected.getCheckpointId(), actual.getCheckpointId());
- assertEquals(expected.getTaskStates(), actual.getTaskStates());
+ assertEquals(checkpointId, actual.getCheckpointId());
+ assertEquals(taskStates, actual.getTaskStates());
assertTrue(actual.getMasterStates().isEmpty());
}
}