You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:21 UTC
[flink] 08/12: [FLINK-11073] [core] Let
ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68fab1277ca31ad65ddd594b0b6b7d4d9c0e4383
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:34:23 2018 +0800
[FLINK-11073] [core] Let ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
.../typeutils/ScalaEitherSerializerSnapshot.java | 62 ++++++----------------
.../api/scala/typeutils/EitherSerializer.scala | 2 +-
2 files changed, 17 insertions(+), 47 deletions(-)
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index b67e47b..26cfef5 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -18,81 +18,51 @@
package org.apache.flink.api.scala.typeutils;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
import scala.util.Either;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* Configuration snapshot for serializers of Scala's {@link Either} type,
* containing configuration snapshots of the Left and Right serializers.
*/
-public class ScalaEitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
+public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
private static final int CURRENT_VERSION = 1;
- private CompositeSerializerSnapshot nestedLeftRightSerializerSnapshot;
-
/**
* Constructor for read instantiation.
*/
- public ScalaEitherSerializerSnapshot() {}
+ public ScalaEitherSerializerSnapshot() {
+ super(EitherSerializer.class);
+ }
/**
* Constructor to create the snapshot for writing.
*/
- public ScalaEitherSerializerSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
- Preconditions.checkNotNull(leftSerializer);
- Preconditions.checkNotNull(rightSerializer);
- this.nestedLeftRightSerializerSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+ public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {
+ super(eitherSerializer);
}
@Override
- public int getCurrentVersion() {
+ public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
- public TypeSerializer<Either<L, R>> restoreSerializer() {
- return new EitherSerializer<>(
- nestedLeftRightSerializerSnapshot.getRestoreSerializer(0),
- nestedLeftRightSerializerSnapshot.getRestoreSerializer(1));
- }
-
- @Override
- public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
- TypeSerializer<Either<L, R>> newSerializer) {
- checkState(nestedLeftRightSerializerSnapshot != null);
+ protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0];
- if (newSerializer instanceof EitherSerializer) {
- EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
+ @SuppressWarnings("unchecked")
+ TypeSerializer<R> rightSerializer = (TypeSerializer<R>) nestedSerializers[1];
- return nestedLeftRightSerializerSnapshot.resolveCompatibilityWithNested(
- TypeSerializerSchemaCompatibility.compatibleAsIs(),
- serializer.getLeftSerializer(),
- serializer.getRightSerializer());
- }
- else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- nestedLeftRightSerializerSnapshot.writeCompositeSnapshot(out);
+ return new EitherSerializer<>(leftSerializer, rightSerializer);
}
@Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- this.nestedLeftRightSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+ protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
}
}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 68432a6..0427bb3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -125,7 +125,7 @@ class EitherSerializer[A, B](
// --------------------------------------------------------------------------------------------
override def snapshotConfiguration(): ScalaEitherSerializerSnapshot[A, B] = {
- new ScalaEitherSerializerSnapshot[A, B](leftSerializer, rightSerializer)
+ new ScalaEitherSerializerSnapshot[A, B](this)
}
override def ensureCompatibility(