You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:18 UTC

[flink] 05/12: [FLINK-11073] [core] Let LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eda73763dfad7c7367f03556134637cf8edc3160
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:22:01 2018 +0800

    [FLINK-11073] [core] Let LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../flink/cep/nfa/sharedbuffer/Lockable.java       |  2 +-
 .../LockableTypeSerializerSnapshot.java            | 56 ++++++----------------
 2 files changed, 15 insertions(+), 43 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index 7afbc50..ae1452b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -187,7 +187,7 @@ public final class Lockable<T> {
 
 		@Override
 		public TypeSerializerSnapshot<Lockable<E>> snapshotConfiguration() {
-			return new LockableTypeSerializerSnapshot<>(elementSerializer);
+			return new LockableTypeSerializerSnapshot<>(this);
 		}
 
 		/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 44a4670..13867ac 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -19,74 +19,46 @@
 package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}.
  */
 @Internal
-public class LockableTypeSerializerSnapshot<E> implements TypeSerializerSnapshot<Lockable<E>> {
+public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public LockableTypeSerializerSnapshot() {}
+	public LockableTypeSerializerSnapshot() {
+		super(Lockable.LockableTypeSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public LockableTypeSerializerSnapshot(TypeSerializer<E> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+	public LockableTypeSerializerSnapshot(Lockable.LockableTypeSerializer<E> lockableTypeSerializer) {
+		super(lockableTypeSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Lockable<E>> restoreSerializer() {
-		return new Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
+	protected Lockable.LockableTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<E> elementSerializer = (TypeSerializer<E>) nestedSerializers[0];
+		return new Lockable.LockableTypeSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public TypeSerializerSchemaCompatibility<Lockable<E>> resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof Lockable.LockableTypeSerializer) {
-			Lockable.LockableTypeSerializer<E> serializer = (Lockable.LockableTypeSerializer<E>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
+	protected TypeSerializer<?>[] getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
-	}
-
-	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
-	}
-
 }