You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/25 22:55:43 UTC

[flink] 04/07: [FLINK-16178][refactor] Make the versioned Checkpoint Metadata Serializers only responsible for deserialization.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f575a8268e5008deb2ccb75cd273d95eaa60b2d0
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Feb 20 12:57:51 2020 +0100

    [FLINK-16178][refactor] Make the versioned Checkpoint Metadata Serializers only responsible for deserialization.
    
    Serialization always happens with the latest version anyways.
---
 .../state/api/output/MergeOperatorStates.java      |  6 ++---
 .../state/api/output/SavepointOutputFormat.java    |  5 ++--
 .../api/output/SavepointOutputFormatTest.java      |  2 +-
 .../flink/runtime/checkpoint/Checkpoints.java      | 17 ++++++------
 .../runtime/checkpoint/PendingCheckpoint.java      |  3 +--
 .../checkpoint/savepoint/SavepointSerializer.java  | 15 ++---------
 .../checkpoint/savepoint/SavepointSerializers.java | 30 +++++-----------------
 .../savepoint/SavepointV1Serializer.java           | 10 ++------
 .../savepoint/SavepointV2Serializer.java           |  8 +++---
 9 files changed, 29 insertions(+), 67 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
index d5789a3..6a80268 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -35,7 +35,7 @@ import java.util.stream.StreamSupport;
  * A reducer that aggregates multiple {@link OperatorState}'s into a single {@link Savepoint}.
  */
 @Internal
-public class MergeOperatorStates implements GroupReduceFunction<OperatorState, Savepoint> {
+public class MergeOperatorStates implements GroupReduceFunction<OperatorState, SavepointV2> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -48,8 +48,8 @@ public class MergeOperatorStates implements GroupReduceFunction<OperatorState, S
 	}
 
 	@Override
-	public void reduce(Iterable<OperatorState> values, Collector<Savepoint> out) {
-		Savepoint savepoint =
+	public void reduce(Iterable<OperatorState> values, Collector<SavepointV2> out) {
+		SavepointV2 savepoint =
 			new SavepointV2(
 				SnapshotUtils.CHECKPOINT_ID,
 				StreamSupport.stream(values.spliterator(), false).collect(Collectors.toList()),
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
index 190de52..f7c9366 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -45,7 +46,7 @@ import java.io.IOException;
  * <p>This format may only be executed with parallelism 1.
  */
 @Internal
-public class SavepointOutputFormat extends RichOutputFormat<Savepoint> {
+public class SavepointOutputFormat extends RichOutputFormat<SavepointV2> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -70,7 +71,7 @@ public class SavepointOutputFormat extends RichOutputFormat<Savepoint> {
 	}
 
 	@Override
-	public void writeRecord(Savepoint savepoint) throws IOException {
+	public void writeRecord(SavepointV2 savepoint) throws IOException {
 		String path = LambdaUtil.withContextClassLoader(getRuntimeContext().getUserCodeClassLoader(), () -> {
 				try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
 					Checkpoints.storeCheckpointMetadata(savepoint, out);
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
index 7cc4b2f..cbc76ea 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SavepointOutputFormatTest.java
@@ -56,7 +56,7 @@ public class SavepointOutputFormatTest {
 		Path path = new Path(temporaryFolder.newFolder().getAbsolutePath());
 		SavepointOutputFormat format = createSavepointOutputFormat(path);
 
-		Savepoint savepoint = createSavepoint();
+		SavepointV2 savepoint = createSavepoint();
 
 		format.open(0, 1);
 		format.writeRecord(savepoint);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index a67b720..fa84742 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2Serializer;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -70,25 +71,23 @@ public class Checkpoints {
 	//  Writing out checkpoint metadata
 	// ------------------------------------------------------------------------
 
-	public static <T extends Savepoint> void storeCheckpointMetadata(
-			T checkpointMetadata,
+	public static void storeCheckpointMetadata(
+			SavepointV2 checkpointMetadata,
 			OutputStream out) throws IOException {
 
 		DataOutputStream dos = new DataOutputStream(out);
 		storeCheckpointMetadata(checkpointMetadata, dos);
 	}
 
-	public static <T extends Savepoint> void storeCheckpointMetadata(
-			T checkpointMetadata,
+	public static void storeCheckpointMetadata(
+			SavepointV2 checkpointMetadata,
 			DataOutputStream out) throws IOException {
 
 		// write generic header
 		out.writeInt(HEADER_MAGIC_NUMBER);
-		out.writeInt(checkpointMetadata.getVersion());
+		out.writeInt(SavepointV2.VERSION);
 
-		// write checkpoint metadata
-		SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(checkpointMetadata);
-		serializer.serialize(checkpointMetadata, out);
+		SavepointV2Serializer.serialize(checkpointMetadata, out);
 	}
 
 	// ------------------------------------------------------------------------
@@ -103,7 +102,7 @@ public class Checkpoints {
 
 		if (magicNumber == HEADER_MAGIC_NUMBER) {
 			final int version = in.readInt();
-			final SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
+			final SavepointSerializer serializer = SavepointSerializers.getSerializer(version);
 			return serializer.deserialize(in, classLoader);
 		}
 		else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1110926..352a2d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -300,7 +299,7 @@ public class PendingCheckpoint {
 			// make sure we fulfill the promise with an exception if something fails
 			try {
 				// write out the metadata
-				final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
+				final SavepointV2 savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
 				final CompletedCheckpointStorageLocation finalizedLocation;
 
 				try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 6b0113f..5ac34d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 
 /**
@@ -28,16 +27,7 @@ import java.io.IOException;
  *
  * <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper.
  */
-public interface SavepointSerializer<T extends Savepoint> {
-
-	/**
-	 * Serializes a savepoint to an output stream.
-	 *
-	 * @param savepoint Savepoint to serialize
-	 * @param dos        Output stream to serialize the savepoint to
-	 * @throws IOException Serialization failures are forwarded
-	 */
-	void serialize(T savepoint, DataOutputStream dos) throws IOException;
+public interface SavepointSerializer {
 
 	/**
 	 * Deserializes a savepoint from an input stream.
@@ -47,6 +37,5 @@ public interface SavepointSerializer<T extends Savepoint> {
 	 * @return The deserialized savepoint
 	 * @throws IOException Serialization failures are forwarded
 	 */
-	T deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
-
+	SavepointV2 deserialize(DataInputStream dis, ClassLoader userCodeClassLoader) throws IOException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 336f17a..256aee1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -19,20 +19,21 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Helper to access {@link SavepointSerializer} for a specific savepoint version.
+ * Helper to access {@link SavepointSerializer}s for specific format versions.
+ *
+ * <p>The serializer for a specific version can be obtained via {@link #getSerializer(int)}.
  */
 public class SavepointSerializers {
 
 	/** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */
 	static boolean failWhenLegacyStateDetected = true;
 
-	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
+	private static final Map<Integer, SavepointSerializer> SERIALIZERS = new HashMap<>(2);
 
 	static {
 		SERIALIZERS.put(SavepointV1Serializer.VERSION, SavepointV1Serializer.INSTANCE);
@@ -46,33 +47,14 @@ public class SavepointSerializers {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Returns the {@link SavepointSerializer} for the given savepoint.
-	 *
-	 * @param savepoint Savepoint to get serializer for
-	 * @param <T>       Type of savepoint
-	 * @return Savepoint serializer for the savepoint
-	 * @throws IllegalArgumentException If unknown savepoint version
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T extends Savepoint> SavepointSerializer<T> getSerializer(T savepoint) {
-		Preconditions.checkNotNull(savepoint, "Savepoint");
-		SavepointSerializer<T> serializer = (SavepointSerializer<T>) SERIALIZERS.get(savepoint.getVersion());
-		if (serializer != null) {
-			return serializer;
-		} else {
-			throw new IllegalArgumentException("Unknown savepoint version " + savepoint.getVersion() + ".");
-		}
-	}
-
-	/**
 	 * Returns the {@link SavepointSerializer} for the given savepoint version.
 	 *
 	 * @param version Savepoint version to get serializer for
 	 * @return Savepoint for the given version
 	 * @throws IllegalArgumentException If unknown savepoint version
 	 */
-	public static SavepointSerializer<?> getSerializer(int version) {
-		SavepointSerializer<?> serializer = SERIALIZERS.get(version);
+	public static SavepointSerializer getSerializer(int version) {
+		SavepointSerializer serializer = SERIALIZERS.get(version);
 		if (serializer != null) {
 			return serializer;
 		} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 0bc7551..e0dd8b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -54,7 +54,7 @@ import java.util.Map;
  */
 @Internal
 @SuppressWarnings("deprecation")
-public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
+public class SavepointV1Serializer implements SavepointSerializer {
 
 	/** The savepoint version. */
 	public static final int VERSION = 1;
@@ -67,13 +67,7 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 
 	public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
 
-	private SavepointV1Serializer() {
-	}
-
-	@Override
-	public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException {
-		throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
-	}
+	private SavepointV1Serializer() {}
 
 	@Override
 	public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 593bcf6..3d60e22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -70,8 +70,7 @@ import java.util.UUID;
  * </pre>
  */
 @Internal
-@VisibleForTesting
-public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
+public class SavepointV2Serializer implements SavepointSerializer {
 
 	/** Random magic number for consistency checks. */
 	private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
@@ -95,8 +94,7 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 	//  (De)serialization entry points
 	// ------------------------------------------------------------------------
 
-	@Override
-	public void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
+	public static void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
 		// first: checkpoint ID
 		dos.writeLong(checkpointMetadata.getCheckpointId());
 
@@ -189,7 +187,7 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 	//  master state (de)serialization methods
 	// ------------------------------------------------------------------------
 
-	private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
+	private static void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
 		// magic number for error detection
 		dos.writeInt(MASTER_STATE_MAGIC_NUMBER);