You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:25 UTC

[flink] 12/12: [FLINK-11073] [core] Rename CompositeSerializerSnapshot to NestedSerializersSnapshotDelegate

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bcaf2e0fdf9c05f257eeb67d67bf1320faeea8f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 7 13:28:19 2019 +0100

    [FLINK-11073] [core] Rename CompositeSerializerSnapshot to NestedSerializersSnapshotDelegate
    
    After introducing CompositeTypeSerializerSnapshot, the
    CompositeSerializerSnapshot has been reworked to only deal with concerns
    of delegating reading and writing of the nested serializers' snapshots.
    It no longer deals with resolving the final compatibility result for the
    outer composite serializer.
    
    Therefore, it is renamed properly as NestedSerializersSnapshotDelegate,
    and also annotated as an internal class, since we want users to use the
    more powerful CompositeTypeSerializerSnapshot instead.
    
    This closes #7422.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 12 +++----
 ...java => NestedSerializersSnapshotDelegate.java} | 37 ++++++++++++----------
 .../base/GenericArraySerializerConfigSnapshot.java | 14 ++++----
 .../runtime/EitherSerializerSnapshot.java          | 16 +++++-----
 4 files changed, 41 insertions(+), 38 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index 71c0836..c73e24c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -99,7 +99,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 	private static final int HIGHEST_LEGACY_READ_VERSION = 2;
 
-	private CompositeSerializerSnapshot compositeSerializerSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSerializersSnapshotDelegate;
 
 	private final Class<S> correspondingSerializerClass;
 
@@ -120,7 +120,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	@SuppressWarnings("unchecked")
 	public CompositeTypeSerializerSnapshot(S serializerInstance) {
 		Preconditions.checkNotNull(serializerInstance);
-		this.compositeSerializerSnapshot = new CompositeSerializerSnapshot(getNestedSerializers(serializerInstance));
+		this.nestedSerializersSnapshotDelegate = new NestedSerializersSnapshotDelegate(getNestedSerializers(serializerInstance));
 		this.correspondingSerializerClass = (Class<S>) serializerInstance.getClass();
 	}
 
@@ -132,7 +132,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	@Override
 	public final void writeSnapshot(DataOutputView out) throws IOException {
 		internalWriteOuterSnapshot(out);
-		compositeSerializerSnapshot.writeCompositeSnapshot(out);
+		nestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -142,7 +142,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 		} else {
 			legacyInternalReadOuterSnapshot(readVersion, in, userCodeClassLoader);
 		}
-		this.compositeSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+		this.nestedSerializersSnapshotDelegate = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, userCodeClassLoader);
 	}
 
 	@Override
@@ -161,14 +161,14 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
 		return constructFinalSchemaCompatibilityResult(
 			getNestedSerializers(castedNewSerializer),
-			compositeSerializerSnapshot.getNestedSerializerSnapshots());
+			nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
 	}
 
 	@Override
 	public final TypeSerializer<T> restoreSerializer() {
 		@SuppressWarnings("unchecked")
 		TypeSerializer<T> serializer = (TypeSerializer<T>)
-			createOuterSerializerWithNestedSerializers(compositeSerializerSnapshot.getRestoreSerializers());
+			createOuterSerializerWithNestedSerializers(nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
 
 		return serializer;
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
similarity index 81%
rename from flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
rename to flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
index 5b3f775..a4dcdd2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,21 +30,21 @@ import java.util.List;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A CompositeSerializerSnapshot represents the snapshots of multiple serializers that are used
+ * A NestedSerializersSnapshotDelegate represents the snapshots of multiple serializers that are used
  * by an outer serializer. Examples would be tuples, where the outer serializer is the tuple
- * format serializer, an the CompositeSerializerSnapshot holds the serializers for the
+ * format serializer, and the NestedSerializersSnapshotDelegate holds the serializers for the
  * different tuple fields.
  *
- * <p>The CompositeSerializerSnapshot does not implement the {@link TypeSerializerSnapshot} interface.
+ * <p>The NestedSerializersSnapshotDelegate does not implement the {@link TypeSerializerSnapshot} interface.
  * It is not meant to be inherited from, but to be composed with a serializer snapshot implementation.
  *
- * <p>The CompositeSerializerSnapshot has its own versioning internally, it does not couple its
+ * <p>The NestedSerializersSnapshotDelegate has its own versioning internally, it does not couple its
  * versioning to the versioning of the TypeSerializerSnapshot that builds on top of this class.
- * That way, the CompositeSerializerSnapshot and enclosing TypeSerializerSnapshot the can evolve
+ * That way, the NestedSerializersSnapshotDelegate and enclosing TypeSerializerSnapshot the can evolve
  * their formats independently.
  */
-@PublicEvolving
-public class CompositeSerializerSnapshot {
+@Internal
+public class NestedSerializersSnapshotDelegate {
 
 	/** Magic number for integrity checks during deserialization. */
 	private static final int MAGIC_NUMBER = 1333245;
@@ -58,14 +58,14 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Constructor to create a snapshot for writing.
 	 */
-	public CompositeSerializerSnapshot(TypeSerializer<?>... serializers) {
+	public NestedSerializersSnapshotDelegate(TypeSerializer<?>... serializers) {
 		this.nestedSnapshots = TypeSerializerUtils.snapshotBackwardsCompatible(serializers);
 	}
 
 	/**
 	 * Constructor to create a snapshot during deserialization.
 	 */
-	private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
+	private NestedSerializersSnapshotDelegate(TypeSerializerSnapshot<?>[] snapshots) {
 		this.nestedSnapshots = snapshots;
 	}
 
@@ -77,14 +77,14 @@ public class CompositeSerializerSnapshot {
 	 * Produces a restore serializer from each contained serializer configuration snapshot.
 	 * The serializers are returned in the same order as the snapshots are stored.
 	 */
-	public TypeSerializer<?>[] getRestoreSerializers() {
+	public TypeSerializer<?>[] getRestoredNestedSerializers() {
 		return snapshotsToRestoreSerializers(nestedSnapshots);
 	}
 
 	/**
 	 * Creates the restore serializer from the pos-th config snapshot.
 	 */
-	public <T> TypeSerializer<T> getRestoreSerializer(int pos) {
+	public <T> TypeSerializer<T> getRestoredNestedSerializer(int pos) {
 		checkArgument(pos < nestedSnapshots.length);
 
 		@SuppressWarnings("unchecked")
@@ -105,6 +105,9 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Resolves the compatibility of the nested serializer snapshots with the nested
 	 * serializers of the new outer serializer.
+	 *
+	 * @deprecated this no method will be removed in the future. Resolving compatibility for nested
+	 *             serializers is now handled by {@link CompositeTypeSerializerSnapshot}.
 	 */
 	@Deprecated
 	public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested(
@@ -145,7 +148,7 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Writes the composite snapshot of all the contained serializers.
 	 */
-	public final void writeCompositeSnapshot(DataOutputView out) throws IOException {
+	public final void writeNestedSerializerSnapshots(DataOutputView out) throws IOException {
 		out.writeInt(MAGIC_NUMBER);
 		out.writeInt(VERSION);
 
@@ -158,7 +161,7 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Reads the composite snapshot of all the contained serializers.
 	 */
-	public static CompositeSerializerSnapshot readCompositeSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+	public static NestedSerializersSnapshotDelegate readNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException {
 		final int magicNumber = in.readInt();
 		if (magicNumber != MAGIC_NUMBER) {
 			throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x",
@@ -177,14 +180,14 @@ public class CompositeSerializerSnapshot {
 			nestedSnapshots[i] = TypeSerializerSnapshot.readVersionedSnapshot(in, cl);
 		}
 
-		return new CompositeSerializerSnapshot(nestedSnapshots);
+		return new NestedSerializersSnapshotDelegate(nestedSnapshots);
 	}
 
 	/**
 	 * Reads the composite snapshot of all the contained serializers in a way that is compatible
 	 * with Version 1 of the deprecated {@link CompositeTypeSerializerConfigSnapshot}.
 	 */
-	public static CompositeSerializerSnapshot legacyReadProductSnapshots(DataInputView in, ClassLoader cl) throws IOException {
+	public static NestedSerializersSnapshotDelegate legacyReadNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException {
 		@SuppressWarnings("deprecation")
 		final List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndSnapshots =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, cl);
@@ -193,7 +196,7 @@ public class CompositeSerializerSnapshot {
 				.map(t -> t.f1)
 				.toArray(TypeSerializerSnapshot<?>[]::new);
 
-		return new CompositeSerializerSnapshot(nestedSnapshots);
+		return new NestedSerializersSnapshotDelegate(nestedSnapshots);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index 8cbe76c..cfc2e98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -54,7 +54,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
 	/** Snapshot handling for the component serializer snapshot. */
 	@Nullable
-	private CompositeSerializerSnapshot nestedSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSnapshot;
 
 	/**
 	 * Constructor for read instantiation.
@@ -67,7 +67,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	 */
 	public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> serializer) {
 		this.componentClass = serializer.getComponentClass();
-		this.nestedSnapshot = new CompositeSerializerSnapshot(serializer.getComponentSerializer());
+		this.nestedSnapshot = new NestedSerializersSnapshotDelegate(serializer.getComponentSerializer());
 	}
 
 	// ------------------------------------------------------------------------
@@ -81,7 +81,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	public void writeSnapshot(DataOutputView out) throws IOException {
 		checkState(componentClass != null && nestedSnapshot != null);
 		out.writeUTF(componentClass.getName());
-		nestedSnapshot.writeCompositeSnapshot(out);
+		nestedSnapshot.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -99,7 +99,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	}
 
 	private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
 
 		try (DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
 			componentClass = InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
@@ -111,13 +111,13 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
 	private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
 		componentClass = InstantiationUtil.resolveClassByName(in, classLoader);
-		nestedSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
 	}
 
 	@Override
 	public TypeSerializer<C[]> restoreSerializer() {
 		checkState(componentClass != null && nestedSnapshot != null);
-		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoreSerializer(0));
+		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 3b7a8e7..1779ec8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -47,7 +47,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 
 	/** Snapshot handling for the component serializer snapshot. */
 	@Nullable
-	private CompositeSerializerSnapshot nestedSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSnapshot;
 
 	/**
 	 * Constructor for read instantiation.
@@ -62,7 +62,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 			TypeSerializer<L> leftSerializer,
 			TypeSerializer<R> rightSerializer) {
 
-		this.nestedSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+		this.nestedSnapshot = new NestedSerializersSnapshotDelegate(leftSerializer, rightSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -75,7 +75,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 	@Override
 	public void writeSnapshot(DataOutputView out) throws IOException {
 		checkState(nestedSnapshot != null);
-		nestedSnapshot.writeCompositeSnapshot(out);
+		nestedSnapshot.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -93,19 +93,19 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 	}
 
 	private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
 	}
 
 	private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
 	}
 
 	@Override
 	public TypeSerializer<Either<L, R>> restoreSerializer() {
 		checkState(nestedSnapshot != null);
 		return new EitherSerializer<>(
-				nestedSnapshot.getRestoreSerializer(0),
-				nestedSnapshot.getRestoreSerializer(1));
+				nestedSnapshot.getRestoredNestedSerializer(0),
+				nestedSnapshot.getRestoredNestedSerializer(1));
 	}
 
 	@Override