You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:18 UTC
[flink] 05/12: [FLINK-11073] [core] Let
LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit eda73763dfad7c7367f03556134637cf8edc3160
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:22:01 2018 +0800
[FLINK-11073] [core] Let LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
.../flink/cep/nfa/sharedbuffer/Lockable.java | 2 +-
.../LockableTypeSerializerSnapshot.java | 56 ++++++----------------
2 files changed, 15 insertions(+), 43 deletions(-)
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index 7afbc50..ae1452b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -187,7 +187,7 @@ public final class Lockable<T> {
@Override
public TypeSerializerSnapshot<Lockable<E>> snapshotConfiguration() {
- return new LockableTypeSerializerSnapshot<>(elementSerializer);
+ return new LockableTypeSerializerSnapshot<>(this);
}
/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 44a4670..13867ac 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -19,74 +19,46 @@
package org.apache.flink.cep.nfa.sharedbuffer;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
/**
* A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}.
*/
@Internal
-public class LockableTypeSerializerSnapshot<E> implements TypeSerializerSnapshot<Lockable<E>> {
+public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> {
private static final int CURRENT_VERSION = 1;
- private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
/**
* Constructor for read instantiation.
*/
- public LockableTypeSerializerSnapshot() {}
+ public LockableTypeSerializerSnapshot() {
+ super(Lockable.LockableTypeSerializer.class);
+ }
/**
* Constructor to create the snapshot for writing.
*/
- public LockableTypeSerializerSnapshot(TypeSerializer<E> elementSerializer) {
- this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+ public LockableTypeSerializerSnapshot(Lockable.LockableTypeSerializer<E> lockableTypeSerializer) {
+ super(lockableTypeSerializer);
}
@Override
- public int getCurrentVersion() {
+ public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
- public TypeSerializer<Lockable<E>> restoreSerializer() {
- return new Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
+ protected Lockable.LockableTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<E> elementSerializer = (TypeSerializer<E>) nestedSerializers[0];
+ return new Lockable.LockableTypeSerializer<>(elementSerializer);
}
@Override
- public TypeSerializerSchemaCompatibility<Lockable<E>> resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
- checkState(nestedElementSerializerSnapshot != null);
-
- if (newSerializer instanceof Lockable.LockableTypeSerializer) {
- Lockable.LockableTypeSerializer<E> serializer = (Lockable.LockableTypeSerializer<E>) newSerializer;
-
- return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
- TypeSerializerSchemaCompatibility.compatibleAsIs(),
- serializer.getElementSerializer());
- }
- else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
+ protected TypeSerializer<?>[] getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
}
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
- }
-
- @Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
- }
-
}