You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:24 UTC

[05/15] flink git commit: [FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

[FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

The separation of the TypeDeserializer interface from the TypeSerializer
base class is due to the fact that additionally implementing the
TypeDeserializer interface alters the generation order of anonymos
serializer classes for Scala case classes and collections.

Instead, the TypeDeserializer is now used as a mixin on the
TypeDeserializerAdapter utility, which now serves as a bridge for
both directions (i.e. TypeSerializer to TypeDeserializer, and vice
versa). No user interfaces are broken due to this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69fada3d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69fada3d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69fada3d

Branch: refs/heads/master
Commit: 69fada3d0b4c686f29c356f00eb49039f416879f
Parents: 8d0c4c0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 11 15:30:36 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:37:46 2017 +0200

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   | 24 +++++++++++-
 .../typeutils/TypeDeserializerAdapter.java      | 40 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  2 +-
 3 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 4c83ded..1e05d57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -60,6 +60,8 @@ public final class CompatibilityResult<T> {
 	 * @param convertDeserializer the convert deserializer to use, in the case that the preceding serializer
 	 *                            cannot be found.
 	 *
+	 * @param <T> the type of the data being migrated.
+	 *
 	 * @return a result that signals migration is necessary, also providing a convert deserializer.
 	 */
 	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) {
@@ -69,11 +71,29 @@ public final class CompatibilityResult<T> {
 	}
 
 	/**
+	 * Returns a result that signals migration to be performed, and in the case that the preceding serializer
+	 * cannot be found or restored to read the previous data during migration, a provided convert serializer
+	 * can be used. The provided serializer will only be used for deserialization.
+	 *
+	 * @param convertSerializer the convert serializer to use, in the case that the preceding serializer
+	 *                          cannot be found. The provided serializer will only be used for deserialization.
+	 *
+	 * @param <T> the type of the data being migrated.
+	 *
+	 * @return a result that signals migration is necessary, also providing a convert serializer.
+	 */
+	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeSerializer<T> convertSerializer) {
+		Preconditions.checkNotNull(convertSerializer, "Convert serializer cannot be null.");
+
+		return new CompatibilityResult<>(true, new TypeDeserializerAdapter<>(convertSerializer));
+	}
+
+	/**
 	 * Returns a result that signals migration to be performed. The migration will fail if the preceding
 	 * serializer for the previous data cannot be found.
 	 *
-	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)},
-	 * which will be used as a fallback resort in such cases.
+	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)}
+	 * or {@link #requiresMigration(TypeSerializer)}, which will be used as a fallback resort in such cases.
 	 *
 	 * @return a result that signals migration is necessary, without providing a convert deserializer.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
index e02bed4..fb59602 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -25,27 +26,42 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * A utility class that wraps a {@link TypeDeserializer} as a {@link TypeSerializer}.
+ * A utility class that is used to bridge a {@link TypeSerializer} and {@link TypeDeserializer}.
+ * It either wraps a type deserializer or serializer, and can only ever be used for deserialization
+ * (i.e. only read-related methods is functional).
  *
- * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer,
+ * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer or serializer,
  * while serialization methods are masked and not intended for use.
  *
  * @param <T> The data type that the deserializer deserializes.
  */
-public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+@Internal
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> implements TypeDeserializer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	/** The actual wrapped deserializer instance */
+	/** The actual wrapped deserializer or serializer instance */
 	private final TypeDeserializer<T> deserializer;
+	private final TypeSerializer<T> serializer;
 
 	/**
-	 * Creates a {@link TypeSerializer} that wraps a {@link TypeDeserializer}.
+	 * Creates a {@link TypeDeserializerAdapter} that wraps a {@link TypeDeserializer}.
 	 *
 	 * @param deserializer the actual deserializer to wrap.
 	 */
 	public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
 		this.deserializer = Preconditions.checkNotNull(deserializer);
+		this.serializer = null;
+	}
+
+	/**
+	 * Creates a {@link TypeDeserializerAdapter} that wraps a {@link TypeSerializer}.
+	 *
+	 * @param serializer the actual serializer to wrap.
+	 */
+	public TypeDeserializerAdapter(TypeSerializer<T> serializer) {
+		this.deserializer = null;
+		this.serializer = Preconditions.checkNotNull(serializer);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -53,31 +69,31 @@ public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
 	// --------------------------------------------------------------------------------------------
 
 	public T deserialize(DataInputView source) throws IOException {
-		return deserializer.deserialize(source);
+		return (deserializer != null) ? deserializer.deserialize(source) : serializer.deserialize(source);
 	}
 
 	public T deserialize(T reuse, DataInputView source) throws IOException {
-		return deserializer.deserialize(reuse, source);
+		return (deserializer != null) ? deserializer.deserialize(reuse, source) : serializer.deserialize(reuse, source);
 	}
 
 	public TypeSerializer<T> duplicate() {
-		return deserializer.duplicate();
+		return (deserializer != null) ? deserializer.duplicate() : serializer.duplicate();
 	}
 
 	public int getLength() {
-		return deserializer.getLength();
+		return (deserializer != null) ? deserializer.getLength() : serializer.getLength();
 	}
 
 	public boolean equals(Object obj) {
-		return deserializer.equals(obj);
+		return (deserializer != null) ? deserializer.equals(obj) : serializer.equals(obj);
 	}
 
 	public boolean canEqual(Object obj) {
-		return deserializer.canEqual(obj);
+		return (deserializer != null) ? deserializer.canEqual(obj) : serializer.canEqual(obj);
 	}
 
 	public int hashCode() {
-		return deserializer.hashCode();
+		return (deserializer != null) ? deserializer.hashCode() : serializer.hashCode();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 85cbfdb..a606a18 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializable {
+public abstract class TypeSerializer<T> implements Serializable {
 	
 	private static final long serialVersionUID = 1L;