You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/25 22:55:43 UTC
[flink] 04/07: [FLINK-16178][refactor] Make the versioned
Checkpoint Metadata Serializers only responsible for deserialization.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f575a8268e5008deb2ccb75cd273d95eaa60b2d0
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 20 12:57:51 2020 +0100
[FLINK-16178][refactor] Make the versioned Checkpoint Metadata Serializers only responsible for deserialization.
Serialization always happens with the latest version anyways.
---
.../state/api/output/MergeOperatorStates.java | 6 ++---
.../state/api/output/SavepointOutputFormat.java | 5 ++--
.../api/output/SavepointOutputFormatTest.java | 2 +-
.../flink/runtime/checkpoint/Checkpoints.java | 17 ++++++------
.../runtime/checkpoint/PendingCheckpoint.java | 3 +--
.../checkpoint/savepoint/SavepointSerializer.java | 15 ++---------
.../checkpoint/savepoint/SavepointSerializers.java | 30 +++++-----------------
.../savepoint/SavepointV1Serializer.java | 10 ++------
.../savepoint/SavepointV2Serializer.java | 8 +++---
9 files changed, 29 insertions(+), 67 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
index d5789a3..6a80268 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -35,7 +35,7 @@ import java.util.stream.StreamSupport;
* A reducer that aggregates multiple {@link OperatorState}'s into a single {@link Savepoint}.
*/
@Internal
-public class MergeOperatorStates implements GroupReduceFunction<OperatorState, Savepoint> {
+public class MergeOperatorStates implements GroupReduceFunction<OperatorState, SavepointV2> {
private static final long serialVersionUID = 1L;
@@ -48,8 +48,8 @@ public class MergeOperatorStates implements GroupReduceFunction<OperatorState, S
}
@Override
- public void reduce(Iterable<OperatorState> values, Collector<Savepoint> out) {
- Savepoint savepoint =
+ public void reduce(Iterable<OperatorState> values, Collector<SavepointV2> out) {
+ SavepointV2 savepoint =
new SavepointV2(
SnapshotUtils.CHECKPOINT_ID,
StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList()),
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
index 190de52..f7c9366 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -45,7 +46,7 @@ import java.io.IOException;
* <p>This format may only be executed with parallelism 1.
*/
@Internal
-public class SavepointOutputFormat extends RichOutputFormat<Savepoint> {
+public class SavepointOutputFormat extends RichOutputFormat<SavepointV2> {
private static final long serialVersionUID = 1L;
@@ -70,7 +71,7 @@ public class SavepointOutputFormat extends RichOutputFormat<Savepoint> {
}
@Override
- public void writeRecord(Savepoint savepoint) throws IOException {
+ public void writeRecord(SavepointV2 savepoint) throws IOException {
String path = LambdaUtil.withContextClassLoader(getRuntimeContext().getUserCodeClassLoader(), () -> {
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
Checkpoints.storeCheckpointMetadata(savepoint, out);
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
index 7cc4b2f..cbc76ea 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
@@ -56,7 +56,7 @@ public class SavepointOutputFormatTest {
Path path = new Path(temporaryFolder.newFolder().getAbsolutePath());
SavepointOutputFormat format = createSavepointOutputFormat(path);
- Savepoint savepoint = createSavepoint();
+ SavepointV2 savepoint = createSavepoint();
format.open(0, 1);
format.writeRecord(savepoint);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index a67b720..fa84742 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -70,25 +71,23 @@ public class Checkpoints {
// Writing out checkpoint metadata
// ------------------------------------------------------------------------
- public static <T extends Savepoint> void storeCheckpointMetadata(
- T checkpointMetadata,
+ public static void storeCheckpointMetadata(
+ SavepointV2 checkpointMetadata,
OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
storeCheckpointMetadata(checkpointMetadata, dos);
}
- public static <T extends Savepoint> void storeCheckpointMetadata(
- T checkpointMetadata,
+ public static void storeCheckpointMetadata(
+ SavepointV2 checkpointMetadata,
DataOutputStream out) throws IOException {
// write generic header
out.writeInt(HEADER_MAGIC_NUMBER);
- out.writeInt(checkpointMetadata.getVersion());
+ out.writeInt(SavepointV2.VERSION);
- // write checkpoint metadata
- SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(checkpointMetadata);
- serializer.serialize(checkpointMetadata, out);
+ SavepointV2Serializer.serialize(checkpointMetadata, out);
}
// ------------------------------------------------------------------------
@@ -103,7 +102,7 @@ public class Checkpoints {
if (magicNumber == HEADER_MAGIC_NUMBER) {
final int version = in.readInt();
- final SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
+ final SavepointSerializer serializer = SavepointSerializers.getSerializer(version);
return serializer.deserialize(in, classLoader);
}
else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1110926..352a2d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -300,7 +299,7 @@ public class PendingCheckpoint {
// make sure we fulfill the promise with an exception if something fails
try {
// write out the metadata
- final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
+ final SavepointV2 savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
final CompletedCheckpointStorageLocation finalizedLocation;
try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 6b0113f..5ac34d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
/**
@@ -28,16 +27,7 @@ import java.io.IOException;
*
* <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
*/
-public interface SavepointSerializer<T extends Savepoint> {
-
- /**
- * Serializes a savepoint to an output stream.
- *
- * @param savepoint Savepoint to serialize
- * @param dos Output stream to serialize the savepoint to
- * @throws IOException Serialization failures are forwarded
- */
- void serialize(T savepoint, DataOutputStream dos) throws IOException;
+public interface SavepointSerializer {
/**
* Deserializes a savepoint from an input stream.
@@ -47,6 +37,5 @@ public interface SavepointSerializer<T extends Savepoint> {
* @return The deserialized savepoint
* @throws IOException Serialization failures are forwarded
*/
- T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
-
+ SavepointV2 deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 336f17a..256aee1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -19,20 +19,21 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.Map;
/**
- * Helper to access {@link SavepointSerializer} for a specific savepoint version.
+ * Helper to access {@link SavepointSerializer}s for specific format versions.
+ *
+ * <p>The serializer for a specific version can be obtained via {@link #getSerializer(int)}.
*/
public class SavepointSerializers {
/** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */
static boolean failWhenLegacyStateDetected = true;
- private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
+ private static final Map<Integer, SavepointSerializer> SERIALIZERS = new HashMap<>(2);
static {
SERIALIZERS.put(SavepointV1Serializer.VERSION, SavepointV1Serializer.INSTANCE);
@@ -46,33 +47,14 @@ public class SavepointSerializers {
// ------------------------------------------------------------------------
/**
- * Returns the {@link SavepointSerializer} for the given savepoint.
- *
- * @param savepoint Savepoint to get serializer for
- * @param <T> Type of savepoint
- * @return Savepoint serializer for the savepoint
- * @throws IllegalArgumentException If unknown savepoint version
- */
- @SuppressWarnings("unchecked")
- public static <T extends Savepoint> SavepointSerializer<T> getSerializer(T savepoint) {
- Preconditions.checkNotNull(savepoint, "Savepoint");
- SavepointSerializer<T> serializer = (SavepointSerializer<T>) SERIALIZERS.get(savepoint.getVersion());
- if (serializer != null) {
- return serializer;
- } else {
- throw new IllegalArgumentException("Unknown savepoint version " + savepoint.getVersion() + ".");
- }
- }
-
- /**
* Returns the {@link SavepointSerializer} for the given savepoint version.
*
* @param version Savepoint version to get serializer for
* @return Savepoint for the given version
* @throws IllegalArgumentException If unknown savepoint version
*/
- public static SavepointSerializer<?> getSerializer(int version) {
- SavepointSerializer<?> serializer = SERIALIZERS.get(version);
+ public static SavepointSerializer getSerializer(int version) {
+ SavepointSerializer serializer = SERIALIZERS.get(version);
if (serializer != null) {
return serializer;
} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 0bc7551..e0dd8b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -54,7 +54,7 @@ import java.util.Map;
*/
@Internal
@SuppressWarnings("deprecation")
-public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
+public class SavepointV1Serializer implements SavepointSerializer {
/** The savepoint version. */
public static final int VERSION = 1;
@@ -67,13 +67,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
- private SavepointV1Serializer() {
- }
-
- @Override
- public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException {
- throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
- }
+ private SavepointV1Serializer() {}
@Override
public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 593bcf6..3d60e22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -70,8 +70,7 @@ import java.util.UUID;
* </pre>
*/
@Internal
-@VisibleForTesting
-public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
+public class SavepointV2Serializer implements SavepointSerializer {
/** Random magic number for consistency checks. */
private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
@@ -95,8 +94,7 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
// (De)serialization entry points
// ------------------------------------------------------------------------
- @Override
- public void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
+ public static void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
// first: checkpoint ID
dos.writeLong(checkpointMetadata.getCheckpointId());
@@ -189,7 +187,7 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
// master state (de)serialization methods
// ------------------------------------------------------------------------
- private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
+ private static void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
// magic number for error detection
dos.writeInt(MASTER_STATE_MAGIC_NUMBER);