You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:21 UTC

[flink] 08/12: [FLINK-11073] [core] Let ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68fab1277ca31ad65ddd594b0b6b7d4d9c0e4383
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:34:23 2018 +0800

    [FLINK-11073] [core] Let ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../typeutils/ScalaEitherSerializerSnapshot.java   | 62 ++++++----------------
 .../api/scala/typeutils/EitherSerializer.scala     |  2 +-
 2 files changed, 17 insertions(+), 47 deletions(-)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index b67e47b..26cfef5 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -18,81 +18,51 @@
 
 package org.apache.flink.api.scala.typeutils;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 
 import scala.util.Either;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
+public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedLeftRightSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ScalaEitherSerializerSnapshot() {}
+	public ScalaEitherSerializerSnapshot() {
+		super(EitherSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ScalaEitherSerializerSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
-		Preconditions.checkNotNull(leftSerializer);
-		Preconditions.checkNotNull(rightSerializer);
-		this.nestedLeftRightSerializerSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+	public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {
+		super(eitherSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Either<L, R>> restoreSerializer() {
-		return new EitherSerializer<>(
-			nestedLeftRightSerializerSnapshot.getRestoreSerializer(0),
-			nestedLeftRightSerializerSnapshot.getRestoreSerializer(1));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
-			TypeSerializer<Either<L, R>> newSerializer) {
-		checkState(nestedLeftRightSerializerSnapshot != null);
+	protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0];
 
-		if (newSerializer instanceof EitherSerializer) {
-			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
+		@SuppressWarnings("unchecked")
+		TypeSerializer<R> rightSerializer = (TypeSerializer<R>) nestedSerializers[1];
 
-			return nestedLeftRightSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getLeftSerializer(),
-				serializer.getRightSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedLeftRightSerializerSnapshot.writeCompositeSnapshot(out);
+		return new EitherSerializer<>(leftSerializer, rightSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedLeftRightSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
 	}
 }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 68432a6..0427bb3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -125,7 +125,7 @@ class EitherSerializer[A, B](
   // --------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): ScalaEitherSerializerSnapshot[A, B] = {
-    new ScalaEitherSerializerSnapshot[A, B](leftSerializer, rightSerializer)
+    new ScalaEitherSerializerSnapshot[A, B](this)
   }
 
   override def ensureCompatibility(