You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:20 UTC
[flink] 07/12: [FLINK-11073] [core] Let MapSerializerSnapshot be a
CompositeTypeSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e6664dfea9668c178047ebfc782278a176afaaf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:29:52 2018 +0800
[FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
.../api/common/typeutils/base/MapSerializer.java | 2 +-
.../base/MapSerializerConfigSnapshot.java | 3 +-
.../typeutils/base/MapSerializerSnapshot.java | 60 ++++++----------------
3 files changed, 18 insertions(+), 47 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index dd3b81b..bedaf69 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -202,6 +202,6 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
@Override
public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
- return new MapSerializerSnapshot<>(keySerializer, valueSerializer);
+ return new MapSerializerSnapshot<>(this);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 000924f..2b78b52 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -50,8 +50,7 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
// redirect the compatibility check to the new MapSerializerConfigSnapshot
MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
- MapSerializerSnapshot<K, V> mapSerializerSnapshot =
- new MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
+ MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
}
else {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index be2e4b0..a6db0ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -18,78 +18,50 @@
package org.apache.flink.api.common.typeutils.base;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.util.Map;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* Snapshot class for the {@link MapSerializer}.
*/
-public class MapSerializerSnapshot<K, V> implements TypeSerializerSnapshot<Map<K, V>> {
+public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
private static final int CURRENT_VERSION = 1;
- private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot;
-
/**
* Constructor for read instantiation.
*/
- public MapSerializerSnapshot() {}
+ public MapSerializerSnapshot() {
+ super(MapSerializer.class);
+ }
/**
* Constructor to create the snapshot for writing.
*/
- public MapSerializerSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
- Preconditions.checkNotNull(keySerializer);
- Preconditions.checkNotNull(valueSerializer);
- this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer);
+ public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
+ super(mapSerializer);
}
@Override
- public int getCurrentVersion() {
+ public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
- public TypeSerializer<Map<K, V>> restoreSerializer() {
- return new MapSerializer<>(
- nestedKeyValueSerializerSnapshot.getRestoreSerializer(0),
- nestedKeyValueSerializerSnapshot.getRestoreSerializer(1));
- }
+ protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
- @Override
- public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
- checkState(nestedKeyValueSerializerSnapshot != null);
+ @SuppressWarnings("unchecked")
+ TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
- if (newSerializer instanceof MapSerializer) {
- MapSerializer<K, V> serializer = (MapSerializer<K, V>) newSerializer;
-
- return nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested(
- TypeSerializerSchemaCompatibility.compatibleAsIs(),
- serializer.getKeySerializer(),
- serializer.getValueSerializer());
- }
- else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out);
+ return new MapSerializer<>(keySerializer, valueSerializer);
}
@Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- this.nestedKeyValueSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+ protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
}
}