You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/22 15:30:00 UTC
[2/3] flink git commit: [FLINK-6482] [core] Add nested serializers to
config snapshots of composite serializers
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a8368c4..2311158 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -34,13 +34,20 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -134,6 +141,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
this.subclassSerializerCache = new HashMap<>();
}
+
+ public PojoSerializer(
+ Class<T> clazz,
+ Field[] fields,
+ TypeSerializer<Object>[] fieldSerializers,
+ LinkedHashMap<Class<?>, Integer> registeredClasses,
+ TypeSerializer<?>[] registeredSerializers,
+ HashMap<Class<?>, TypeSerializer<?>> subclassSerializerCache) {
+
+ this.clazz = checkNotNull(clazz);
+ this.fields = checkNotNull(fields);
+ this.numFields = fields.length;
+ this.fieldSerializers = checkNotNull(fieldSerializers);
+ this.registeredClasses = checkNotNull(registeredClasses);
+ this.registeredSerializers = checkNotNull(registeredSerializers);
+ this.subclassSerializerCache = checkNotNull(subclassSerializerCache);
+
+ this.executionConfig = null;
+ }
@Override
public boolean isImmutableType() {
@@ -558,6 +584,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot;
+ boolean requiresMigration = false;
+
if (clazz.equals(config.getTypeClass())) {
if (this.numFields == config.getFieldToSerializerConfigSnapshot().size()) {
@@ -572,16 +600,27 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
int i = 0;
- for (Map.Entry<Field, TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry
+ for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
: config.getFieldToSerializerConfigSnapshot().entrySet()) {
int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
if (fieldIndex != -1) {
reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
- compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
+ compatResult = CompatibilityUtil.resolveCompatibilityResult(
+ fieldToConfigSnapshotEntry.getValue().f0,
+ UnloadableDummyTypeSerializer.class,
+ fieldToConfigSnapshotEntry.getValue().f1,
+ fieldSerializers[fieldIndex]);
+
if (compatResult.isRequiresMigration()) {
- return CompatibilityResult.requiresMigration();
+ requiresMigration = true;
+
+ if (compatResult.getConvertDeserializer() != null) {
+ reorderedFieldSerializers[i] = (TypeSerializer<Object>) compatResult.getConvertDeserializer();
+ } else {
+ return CompatibilityResult.requiresMigration();
+ }
} else {
reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
}
@@ -599,7 +638,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags;
final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers;
- final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> previousRegistrations =
+ final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousRegistrations =
config.getRegisteredSubclassesToSerializerConfigSnapshots();
// the reconfigured list of registered subclasses will be the previous registered
@@ -615,11 +654,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
reorderedRegisteredSubclasses, executionConfig);
i = 0;
- for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) {
+ for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousRegisteredSerializerConfig : previousRegistrations.values()) {
// check compatibility of subclass serializer
- compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
+ compatResult = CompatibilityUtil.resolveCompatibilityResult(
+ previousRegisteredSerializerConfig.f0,
+ UnloadableDummyTypeSerializer.class,
+ previousRegisteredSerializerConfig.f1,
+ reorderedRegisteredSubclassSerializers[i]);
+
if (compatResult.isRequiresMigration()) {
- return CompatibilityResult.requiresMigration();
+ requiresMigration = true;
+
+ if (compatResult.getConvertDeserializer() == null) {
+ return CompatibilityResult.requiresMigration();
+ }
}
i++;
@@ -631,15 +679,26 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
// this won't be applied to this serializer until all compatibility checks have been completed
HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>();
- for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> previousCachedEntry
+ for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousCachedEntry
: config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
// check compatibility of cached subclass serializer
- compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
+ compatResult = CompatibilityUtil.resolveCompatibilityResult(
+ previousCachedEntry.getValue().f0,
+ UnloadableDummyTypeSerializer.class,
+ previousCachedEntry.getValue().f1,
+ cachedSerializer);
+
if (compatResult.isRequiresMigration()) {
- return CompatibilityResult.requiresMigration();
+ requiresMigration = true;
+
+ if (compatResult.getConvertDeserializer() != null) {
+ rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
+ } else {
+ return CompatibilityResult.requiresMigration();
+ }
} else {
rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
}
@@ -648,15 +707,26 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
// completed compatibility checks; up to this point, we can just reconfigure
// the serializer so that it is compatible and migration is not required
- this.fields = reorderedFields;
- this.fieldSerializers = reorderedFieldSerializers;
+ if (!requiresMigration) {
+ this.fields = reorderedFields;
+ this.fieldSerializers = reorderedFieldSerializers;
- this.registeredClasses = reorderedRegisteredSubclassesToClasstags;
- this.registeredSerializers = reorderedRegisteredSubclassSerializers;
+ this.registeredClasses = reorderedRegisteredSubclassesToClasstags;
+ this.registeredSerializers = reorderedRegisteredSubclassSerializers;
- this.subclassSerializerCache = rebuiltCache;
+ this.subclassSerializerCache = rebuiltCache;
- return CompatibilityResult.compatible();
+ return CompatibilityResult.compatible();
+ } else {
+ return CompatibilityResult.requiresMigration(
+ new PojoSerializer<>(
+ clazz,
+ reorderedFields,
+ reorderedFieldSerializers,
+ reorderedRegisteredSubclassesToClasstags,
+ reorderedRegisteredSubclassSerializers,
+ rebuiltCache));
+ }
}
}
}
@@ -669,39 +739,56 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
private static final int VERSION = 1;
/**
- * Ordered map of POJO fields to the configuration snapshots of their corresponding serializers.
+ * Ordered map of POJO fields to their corresponding serializers and its configuration snapshots.
*
* <p>Ordering of the fields is kept so that new Pojo serializers for previous data
* may reorder the fields in case they are different. The order of the fields need to
* stay the same for binary compatibility, as the field order is part of the serialization format.
*/
- private LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot;
+ private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
/**
- * Ordered map of registered subclasses to the configuration snapshots of their corresponding serializers.
+ * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots.
*
* <p>Ordering of the registered subclasses is kept so that new Pojo serializers for previous data
* may retain the same class tag used for registered subclasses. Newly registered subclasses that
* weren't present before should be appended with the next available class tag.
*/
- private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots;
+ private LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots;
/**
- * Configuration snapshots of previously cached non-registered subclass serializers.
+ * Previously cached non-registered subclass serializers and its configuration snapshots.
*
* <p>This is kept so that new Pojo serializers may eagerly repopulate their
* cache with reconfigured subclass serializers.
*/
- private HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots;
+ private HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots;
+
+ private boolean ignoreTypeSerializerSerialization;
/** This empty nullary constructor is required for deserializing the configuration. */
public PojoSerializerConfigSnapshot() {}
public PojoSerializerConfigSnapshot(
Class<T> pojoType,
- LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot,
- LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots,
- HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+ LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+ LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
+ HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+
+ this(
+ pojoType,
+ fieldToSerializerConfigSnapshot,
+ registeredSubclassesToSerializerConfigSnapshots,
+ nonRegisteredSubclassesToSerializerConfigSnapshots,
+ false);
+ }
+
+ public PojoSerializerConfigSnapshot(
+ Class<T> pojoType,
+ LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+ LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
+ HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots,
+ boolean ignoreTypeSerializerSerialization) {
super(pojoType);
@@ -711,37 +798,71 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots);
this.nonRegisteredSubclassesToSerializerConfigSnapshots =
Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots);
+
+ this.ignoreTypeSerializerSerialization = ignoreTypeSerializerSerialization;
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
- // --- write fields and their serializers, in order
+ try (
+ ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos();
+ DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) {
- out.writeInt(fieldToSerializerConfigSnapshot.size());
- for (Map.Entry<Field, TypeSerializerConfigSnapshot> entry
- : fieldToSerializerConfigSnapshot.entrySet()) {
- out.writeUTF(entry.getKey().getName());
- TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
- }
+ // --- write fields and their serializers, in order
- // --- write registered subclasses and their serializers, in registration order
+ out.writeInt(fieldToSerializerConfigSnapshot.size());
+ for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+ : fieldToSerializerConfigSnapshot.entrySet()) {
- out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
- for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
- : registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
- out.writeUTF(entry.getKey().getName());
- TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
- }
+ outViewWrapper.writeUTF(entry.getKey().getName());
+
+ out.writeInt(outWithPos.getPosition());
+ if (!ignoreTypeSerializerSerialization) {
+ TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0);
+ }
+
+ out.writeInt(outWithPos.getPosition());
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+ }
+
+ // --- write registered subclasses and their serializers, in registration order
+
+ out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
+ for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+ : registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
+
+ outViewWrapper.writeUTF(entry.getKey().getName());
+
+ out.writeInt(outWithPos.getPosition());
+ if (!ignoreTypeSerializerSerialization) {
+ TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0);
+ }
+
+ out.writeInt(outWithPos.getPosition());
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+ }
+
+ // --- write snapshot of non-registered subclass serializer cache
+
+ out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
+ for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+ : nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
+
+ outViewWrapper.writeUTF(entry.getKey().getName());
+
+ out.writeInt(outWithPos.getPosition());
+ if (!ignoreTypeSerializerSerialization) {
+ TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0);
+ }
- // --- write snapshot of non-registered subclass serializer cache
+ out.writeInt(outWithPos.getPosition());
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1);
+ }
- out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
- for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
- : nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
- out.writeUTF(entry.getKey().getName());
- TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+ out.writeInt(outWithPos.getPosition());
+ out.write(outWithPos.getBuf(), 0 , outWithPos.getPosition());
}
}
@@ -749,74 +870,126 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
public void read(DataInputView in) throws IOException {
super.read(in);
- // --- read fields and their serializers, in order
-
int numFields = in.readInt();
- this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
- String fieldName;
- Field field;
+ int[] fieldSerializerOffsets = new int[numFields * 2];
for (int i = 0; i < numFields; i++) {
- fieldName = in.readUTF();
+ fieldSerializerOffsets[i * 2] = in.readInt();
+ fieldSerializerOffsets[i * 2 + 1] = in.readInt();
+ }
- // search all superclasses for the field
- Class<?> clazz = getTypeClass();
- field = null;
- while (clazz != null) {
- try {
- field = clazz.getDeclaredField(fieldName);
- field.setAccessible(true);
- break;
- } catch (NoSuchFieldException e) {
- clazz = clazz.getSuperclass();
+
+ int numRegisteredSubclasses = in.readInt();
+ int[] registeredSerializerOffsets = new int[numRegisteredSubclasses * 2];
+ for (int i = 0; i < numRegisteredSubclasses; i++) {
+ registeredSerializerOffsets[i * 2] = in.readInt();
+ registeredSerializerOffsets[i * 2 + 1] = in.readInt();
+ }
+
+ int numCachedSubclassSerializers = in.readInt();
+ int[] cachedSerializerOffsets = new int[numCachedSubclassSerializers * 2];
+ for (int i = 0; i < numCachedSubclassSerializers; i++) {
+ cachedSerializerOffsets[i * 2] = in.readInt();
+ cachedSerializerOffsets[i * 2 + 1] = in.readInt();
+ }
+
+ int totalBytes = in.readInt();
+ byte[] buffer = new byte[totalBytes];
+ in.readFully(buffer);
+
+ try (
+ ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer);
+ DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos)) {
+
+ // --- read fields and their serializers, in order
+
+ this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
+ String fieldName;
+ Field field;
+ TypeSerializer<?> fieldSerializer;
+ TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot;
+ for (int i = 0; i < numFields; i++) {
+ fieldName = inViewWrapper.readUTF();
+
+ // search all superclasses for the field
+ Class<?> clazz = getTypeClass();
+ field = null;
+ while (clazz != null) {
+ try {
+ field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ break;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
}
- }
- if (field == null) {
- // the field no longer exists in the POJO
- throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
- } else {
- fieldToSerializerConfigSnapshot.put(
+ if (field == null) {
+ // the field no longer exists in the POJO
+ throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
+ } else {
+ inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+ fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+
+ inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+ fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+
+ fieldToSerializerConfigSnapshot.put(
field,
- TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
+ }
}
- }
- // --- read registered subclasses and their serializers, in registration order
+ // --- read registered subclasses and their serializers, in registration order
- int numRegisteredSubclasses = in.readInt();
- this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses);
- String registeredSubclassname;
- Class<?> registeredSubclass;
- for (int i = 0; i < numRegisteredSubclasses; i++) {
- registeredSubclassname = in.readUTF();
- try {
- registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e);
- }
+ this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses);
+ String registeredSubclassname;
+ Class<?> registeredSubclass;
+ TypeSerializer<?> registeredSubclassSerializer;
+ TypeSerializerConfigSnapshot registeredSubclassSerializerConfigSnapshot;
+ for (int i = 0; i < numRegisteredSubclasses; i++) {
+ registeredSubclassname = inViewWrapper.readUTF();
+ try {
+ registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e);
+ }
- this.registeredSubclassesToSerializerConfigSnapshots.put(
- registeredSubclass,
- TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
- }
+ inWithPos.setPosition(registeredSerializerOffsets[i * 2]);
+ registeredSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
- // --- read snapshot of non-registered subclass serializer cache
+ inWithPos.setPosition(registeredSerializerOffsets[i * 2 + 1]);
+ registeredSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
- int numCachedSubclassSerializers = in.readInt();
- this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers);
- String cachedSubclassname;
- Class<?> cachedSubclass;
- for (int i = 0; i < numCachedSubclassSerializers; i++) {
- cachedSubclassname = in.readUTF();
- try {
- cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e);
+ this.registeredSubclassesToSerializerConfigSnapshots.put(
+ registeredSubclass,
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(registeredSubclassSerializer, registeredSubclassSerializerConfigSnapshot));
}
- this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+ // --- read snapshot of non-registered subclass serializer cache
+
+ this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers);
+ String cachedSubclassname;
+ Class<?> cachedSubclass;
+ TypeSerializer<?> cachedSubclassSerializer;
+ TypeSerializerConfigSnapshot cachedSubclassSerializerConfigSnapshot;
+ for (int i = 0; i < numCachedSubclassSerializers; i++) {
+ cachedSubclassname = inViewWrapper.readUTF();
+ try {
+ cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e);
+ }
+
+ inWithPos.setPosition(cachedSerializerOffsets[i * 2]);
+ cachedSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
+
+ inWithPos.setPosition(cachedSerializerOffsets[i * 2 + 1]);
+ cachedSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+
+ this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
cachedSubclass,
- TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(cachedSubclassSerializer, cachedSubclassSerializerConfigSnapshot));
+ }
}
}
@@ -825,15 +998,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
return VERSION;
}
- public LinkedHashMap<Field, TypeSerializerConfigSnapshot> getFieldToSerializerConfigSnapshot() {
+ public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
return fieldToSerializerConfigSnapshot;
}
- public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> getRegisteredSubclassesToSerializerConfigSnapshots() {
+ public LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getRegisteredSubclassesToSerializerConfigSnapshots() {
return registeredSubclassesToSerializerConfigSnapshots;
}
- public HashMap<Class<?>, TypeSerializerConfigSnapshot> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+ public HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
return nonRegisteredSubclassesToSerializerConfigSnapshots;
}
@@ -1001,27 +1174,35 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
TypeSerializer<?>[] fieldSerializers,
HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
- final LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshots =
+ final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
new LinkedHashMap<>(fields.length);
for (int i = 0; i < fields.length; i++) {
- fieldToSerializerConfigSnapshots.put(fields[i], fieldSerializers[i].snapshotConfiguration());
+ fieldToSerializerConfigSnapshots.put(
+ fields[i],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration()));
}
- final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots =
+ final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots =
new LinkedHashMap<>(registeredSubclassesToTags.size());
for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) {
registeredSubclassesToSerializerConfigSnapshots.put(
entry.getKey(),
- registeredSubclassSerializers[entry.getValue()].snapshotConfiguration());
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ registeredSubclassSerializers[entry.getValue()],
+ registeredSubclassSerializers[entry.getValue()].snapshotConfiguration()));
}
- final HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots =
+ final HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots =
new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) {
- nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), entry.getValue().snapshotConfiguration());
+ nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+ entry.getKey(),
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ entry.getValue(),
+ entry.getValue().snapshotConfiguration()));
}
return new PojoSerializerConfigSnapshot<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index ba41d4b..bd08b04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -19,11 +19,13 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Row;
@@ -31,6 +33,7 @@ import org.apache.flink.types.Row;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Arrays;
+import java.util.List;
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
@@ -254,22 +257,28 @@ public final class RowSerializer extends TypeSerializer<Row> {
@Override
public RowSerializerConfigSnapshot snapshotConfiguration() {
- return new RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+ return new RowSerializerConfigSnapshot(fieldSerializers);
}
@Override
public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof RowSerializerConfigSnapshot) {
- TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
- ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+ List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+ ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
- if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
+ if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
boolean requireMigration = false;
TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length];
CompatibilityResult<?> compatResult;
- for (int i = 0; i < fieldSerializers.length; i++) {
- compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+ int i = 0;
+ for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+ compatResult = CompatibilityUtil.resolveCompatibilityResult(
+ f.f0,
+ UnloadableDummyTypeSerializer.class,
+ f.f1,
+ fieldSerializers[i]);
+
if (compatResult.isRequiresMigration()) {
requireMigration = true;
@@ -281,6 +290,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
}
}
+
+ i++;
}
if (requireMigration) {
@@ -301,8 +312,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
/** This empty nullary constructor is required for deserializing the configuration. */
public RowSerializerConfigSnapshot() {}
- public RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
- super(fieldSerializerConfigSnapshots);
+ public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
+ super(fieldSerializers);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index f485c3e..911c96f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -156,4 +156,9 @@ public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
throw new RuntimeException("Cannot instantiate tuple.", e);
}
}
+
+ @Override
+ protected TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+ return new TupleSerializer<>(tupleClass, fieldSerializers);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 032c3f1..f12dcd9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -20,14 +20,18 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -125,9 +129,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@Override
public TupleSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new TupleSerializerConfigSnapshot<>(
- tupleClass,
- TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+ return new TupleSerializerConfigSnapshot<>(tupleClass, fieldSerializers);
}
@SuppressWarnings("unchecked")
@@ -137,24 +139,48 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot;
if (tupleClass.equals(config.getTupleClass())) {
- TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
- ((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+ List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
+ ((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+
+ if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
+
+ TypeSerializer<Object>[] convertFieldSerializers = new TypeSerializer[fieldSerializers.length];
+ boolean requiresMigration = false;
+ CompatibilityResult<Object> compatResult;
+ int i = 0;
+ for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
+ compatResult = CompatibilityUtil.resolveCompatibilityResult(
+ f.f0,
+ UnloadableDummyTypeSerializer.class,
+ f.f1,
+ fieldSerializers[i]);
- if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
-
- CompatibilityResult compatResult;
- for (int i = 0; i < fieldSerializers.length; i++) {
- compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
if (compatResult.isRequiresMigration()) {
- return CompatibilityResult.requiresMigration();
+ requiresMigration = true;
+
+ if (compatResult.getConvertDeserializer() != null) {
+ convertFieldSerializers[i] =
+ new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
+ } else {
+ return CompatibilityResult.requiresMigration();
+ }
}
+
+ i++;
}
- return CompatibilityResult.compatible();
+ if (!requiresMigration) {
+ return CompatibilityResult.compatible();
+ } else {
+ return CompatibilityResult.requiresMigration(
+ createSerializerInstance(tupleClass, convertFieldSerializers));
+ }
}
}
}
return CompatibilityResult.requiresMigration();
}
+
+ protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
index 6d2bb5f..1e7701c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
@@ -41,11 +41,8 @@ public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSeriali
/** This empty nullary constructor is required for deserializing the configuration. */
public TupleSerializerConfigSnapshot() {}
- public TupleSerializerConfigSnapshot(
- Class<T> tupleClass,
- TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
-
- super(fieldSerializerConfigSnapshots);
+ public TupleSerializerConfigSnapshot(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
+ super(fieldSerializers);
this.tupleClass = Preconditions.checkNotNull(tupleClass);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 73c4379..57015c7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -109,14 +109,14 @@ public abstract class SerializerTestBase<T> extends TestLogger {
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
new DataOutputViewStreamWrapper(out), configSnapshot);
serializedConfig = out.toByteArray();
}
TypeSerializerConfigSnapshot restoredConfig;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+ restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
deleted file mode 100644
index 0783bb6..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * Unit tests related to {@link TypeSerializerConfigSnapshot}.
- */
-public class TypeSerializerConfigSnapshotTest {
-
- /**
- * Verifies that reading and writing configuration snapshots work correctly.
- */
- @Test
- public void testSerializeConfigurationSnapshots() throws Exception {
- TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo");
- TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar");
-
- byte[] serializedConfig;
- try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshots(
- new DataOutputViewStreamWrapper(out),
- configSnapshot1,
- configSnapshot2);
-
- serializedConfig = out.toByteArray();
- }
-
- TypeSerializerConfigSnapshot[] restoredConfigs;
- try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots(
- new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
- }
-
- assertEquals(2, restoredConfigs.length);
- assertEquals(configSnapshot1, restoredConfigs[0]);
- assertEquals(configSnapshot2, restoredConfigs[1]);
- }
-
- /**
- * Verifies that deserializing config snapshots fail if the config class could not be found.
- */
- @Test
- public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
- byte[] serializedConfig;
- try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(
- new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar"));
- serializedConfig = out.toByteArray();
- }
-
- try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- // read using a dummy classloader
- TypeSerializerUtil.readSerializerConfigSnapshot(
- new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
- fail("Expected a ClassNotFoundException wrapped in IOException");
- } catch (IOException expected) {
- // test passes
- }
- }
-
- public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
-
- static final int VERSION = 1;
-
- private int val;
- private String msg;
-
- public TestConfigSnapshot() {}
-
- public TestConfigSnapshot(int val, String msg) {
- this.val = val;
- this.msg = msg;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
- out.writeInt(val);
- out.writeUTF(msg);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
- val = in.readInt();
- msg = in.readUTF();
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- if (obj instanceof TestConfigSnapshot) {
- return val == ((TestConfigSnapshot) obj).val && msg.equals(((TestConfigSnapshot) obj).msg);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return 31 * val + msg.hashCode();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
deleted file mode 100644
index db1b4ef..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(InstantiationUtil.class)
-public class TypeSerializerSerializationProxyTest {
-
- @Test
- public void testStateSerializerSerializationProxy() throws Exception {
-
- TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
- TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
-
- byte[] serialized;
- try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
- proxy.write(new DataOutputViewStreamWrapper(out));
- serialized = out.toByteArray();
- }
-
- proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
-
- try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- proxy.read(new DataInputViewStreamWrapper(in));
- }
-
- Assert.assertEquals(serializer, proxy.getTypeSerializer());
- }
-
- @Test
- public void testStateSerializerSerializationProxyClassNotFound() throws Exception {
-
- TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
- TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
-
- byte[] serialized;
- try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
- proxy.write(new DataOutputViewStreamWrapper(out));
- serialized = out.toByteArray();
- }
-
- proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
-
- try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- proxy.read(new DataInputViewStreamWrapper(in));
- fail("ClassNotFoundException expected, leading to IOException");
- } catch (IOException expected) {
-
- }
-
- proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
-
- try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- proxy.read(new DataInputViewStreamWrapper(in));
- }
-
- Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
-
- Assert.assertArrayEquals(
- InstantiationUtil.serializeObject(serializer),
- ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
- }
-
- @Test
- public void testStateSerializerSerializationProxyInvalidClass() throws Exception {
-
- TypeSerializer<?> serializer = IntSerializer.INSTANCE;
-
- TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
-
- byte[] serialized;
- try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
- proxy.write(new DataOutputViewStreamWrapper(out));
- serialized = out.toByteArray();
- }
-
- PowerMockito.spy(InstantiationUtil.class);
- PowerMockito
- .doThrow(new InvalidClassException("test invalid class exception"))
- .when(InstantiationUtil.class, "deserializeObject", any(byte[].class), any(ClassLoader.class));
-
- proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
-
- try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- proxy.read(new DataInputViewStreamWrapper(in));
- fail("InvalidClassException expected, leading to IOException");
- } catch (IOException expected) {
-
- }
-
- proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
-
- try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- proxy.read(new DataInputViewStreamWrapper(in));
- }
-
- Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
-
- Assert.assertArrayEquals(
- InstantiationUtil.serializeObject(serializer),
- ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
new file mode 100644
index 0000000..738644b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link TypeSerializerSerializationUtil}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
+public class TypeSerializerSerializationUtilTest {
+
+ /**
+ * Verifies that reading and writing serializers work correctly.
+ */
+ @Test
+ public void testSerializerSerialization() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
+ serialized = out.toByteArray();
+ }
+
+ TypeSerializer<?> deserializedSerializer;
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ Assert.assertEquals(serializer, deserializedSerializer);
+ }
+
+ /**
+ * Verifies deserialization failure cases when reading a serializer from bytes, in the
+ * case of a {@link ClassNotFoundException}.
+ */
+ @Test
+ public void testSerializerSerializationWithClassNotFound() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
+ serialized = out.toByteArray();
+ }
+
+ TypeSerializer<?> deserializedSerializer;
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+ new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
+ }
+ Assert.assertEquals(null, deserializedSerializer);
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+ new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null), true);
+ }
+ Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer);
+
+ Assert.assertArrayEquals(
+ InstantiationUtil.serializeObject(serializer),
+ ((UnloadableDummyTypeSerializer<?>) deserializedSerializer).getActualBytes());
+ }
+
+ /**
+ * Verifies deserialization failure cases when reading a serializer from bytes, in the
+ * case of a {@link InvalidClassException}.
+ */
+ @Test
+ public void testSerializerSerializationWithInvalidClass() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
+ serialized = out.toByteArray();
+ }
+
+ TypeSerializer<?> deserializedSerializer;
+
+ // mock failure when deserializing serializers
+ TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+ mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+ doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+ PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+ Assert.assertEquals(null, deserializedSerializer);
+ }
+
+ /**
+ * Verifies that reading and writing configuration snapshots work correctly.
+ */
+ @Test
+ public void testSerializeConfigurationSnapshots() throws Exception {
+ TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot1 =
+ new TypeSerializerSerializationUtilTest.TestConfigSnapshot(1, "foo");
+
+ TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot2 =
+ new TypeSerializerSerializationUtilTest.TestConfigSnapshot(2, "bar");
+
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshots(
+ new DataOutputViewStreamWrapper(out),
+ configSnapshot1,
+ configSnapshot2);
+
+ serializedConfig = out.toByteArray();
+ }
+
+ TypeSerializerConfigSnapshot[] restoredConfigs;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ restoredConfigs = TypeSerializerSerializationUtil.readSerializerConfigSnapshots(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ assertEquals(2, restoredConfigs.length);
+ assertEquals(configSnapshot1, restoredConfigs[0]);
+ assertEquals(configSnapshot2, restoredConfigs[1]);
+ }
+
+ /**
+ * Verifies that deserializing config snapshots fail if the config class could not be found.
+ */
+ @Test
+ public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
+ new DataOutputViewStreamWrapper(out), new TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar"));
+ serializedConfig = out.toByteArray();
+ }
+
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ // read using a dummy classloader
+ TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
+ fail("Expected a ClassNotFoundException wrapped in IOException");
+ } catch (IOException expected) {
+ // test passes
+ }
+ }
+
+ /**
+ * Verifies resilience to serializer deserialization failures when writing and reading
+ * serializer and config snapshot pairs.
+ */
+ @Test
+ public void testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures() throws Exception {
+ List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = Arrays.asList(
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ IntSerializer.INSTANCE, IntSerializer.INSTANCE.snapshotConfiguration()),
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ DoubleSerializer.INSTANCE, DoubleSerializer.INSTANCE.snapshotConfiguration()));
+
+ byte[] serializedSerializersAndConfigs;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+ new DataOutputViewStreamWrapper(out), serializersAndConfigs);
+ serializedSerializersAndConfigs = out.toByteArray();
+ }
+
+ // mock failure when deserializing serializers
+ TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+ mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+ doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+ PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+ List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> restored;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSerializersAndConfigs)) {
+ restored = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ Assert.assertEquals(2, restored.size());
+ Assert.assertEquals(null, restored.get(0).f0);
+ Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), restored.get(0).f1);
+ Assert.assertEquals(null, restored.get(1).f0);
+ Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1);
+ }
+
+ public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
+
+ static final int VERSION = 1;
+
+ private int val;
+ private String msg;
+
+ public TestConfigSnapshot() {}
+
+ public TestConfigSnapshot(int val, String msg) {
+ this.val = val;
+ this.msg = msg;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ out.writeInt(val);
+ out.writeUTF(msg);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+ val = in.readInt();
+ msg = in.readUTF();
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj instanceof TypeSerializerSerializationUtilTest.TestConfigSnapshot) {
+ return val == ((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).val
+ && msg.equals(((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).msg);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * val + msg.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 16ea945..e3ce3ee 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.InstantiationUtil;
@@ -95,14 +95,14 @@ public class EnumSerializerTest extends TestLogger {
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration());
serializedConfig = out.toByteArray();
}
TypeSerializerConfigSnapshot restoredConfig;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+ restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index c77ffcc..10f4708 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -20,12 +20,14 @@ package org.apache.flink.api.java.typeutils.runtime;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Random;
@@ -39,8 +41,9 @@ import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -50,14 +53,23 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
/**
* A test for the {@link PojoSerializer}.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TypeSerializerSerializationUtil.class)
public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> {
private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class);
@@ -286,7 +298,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
@@ -295,7 +307,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
// read configuration again from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
@@ -322,7 +334,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
@@ -335,7 +347,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
// read configuration from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
@@ -368,7 +380,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
@@ -378,7 +390,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
// read configuration from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
@@ -426,7 +438,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
@@ -438,7 +450,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
// read configuration from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
@@ -472,14 +484,38 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
// creating this serializer just for generating config snapshots of the field serializers
PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
- LinkedHashMap<Field, TypeSerializerConfigSnapshot> mockOriginalFieldToSerializerConfigSnapshot =
+ LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
new LinkedHashMap<>(mockOriginalFieldOrder.length);
- mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], ser.getFieldSerializers()[3].snapshotConfiguration());
- mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], ser.getFieldSerializers()[2].snapshotConfiguration());
- mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], ser.getFieldSerializers()[5].snapshotConfiguration());
- mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], ser.getFieldSerializers()[0].snapshotConfiguration());
- mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], ser.getFieldSerializers()[1].snapshotConfiguration());
- mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], ser.getFieldSerializers()[4].snapshotConfiguration());
+ mockOriginalFieldToSerializerConfigSnapshot.put(
+ mockOriginalFieldOrder[0],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ ser.getFieldSerializers()[3],
+ ser.getFieldSerializers()[3].snapshotConfiguration()));
+ mockOriginalFieldToSerializerConfigSnapshot.put(
+ mockOriginalFieldOrder[1],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ ser.getFieldSerializers()[2],
+ ser.getFieldSerializers()[2].snapshotConfiguration()));
+ mockOriginalFieldToSerializerConfigSnapshot.put(
+ mockOriginalFieldOrder[2],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ ser.getFieldSerializers()[5],
+ ser.getFieldSerializers()[5].snapshotConfiguration()));
+ mockOriginalFieldToSerializerConfigSnapshot.put(
+ mockOriginalFieldOrder[3],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ ser.getFieldSerializers()[0],
+ ser.getFieldSerializers()[0].snapshotConfiguration()));
+ mockOriginalFieldToSerializerConfigSnapshot.put(
+ mockOriginalFieldOrder[4],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ ser.getFieldSerializers()[1],
+ ser.getFieldSerializers()[1].snapshotConfiguration()));
+ mockOriginalFieldToSerializerConfigSnapshot.put(
+ mockOriginalFieldOrder[5],
+ new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
+ ser.getFieldSerializers()[4],
+ ser.getFieldSerializers()[4].snapshotConfiguration()));
PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
@@ -494,13 +530,11 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
new PojoSerializer.PojoSerializerConfigSnapshot<>(
TestUserClass.class,
mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order
- new LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test
- new HashMap<Class<?>, TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test
+ new LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>(), // empty; irrelevant for this test
+ new HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>()); // empty; irrelevant for this test
// reconfigure - check reconfiguration result and that fields are reordered to the previous order
- CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(
-
- mockPreviousConfigSnapshot);
+ CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
assertFalse(compatResult.isRequiresMigration());
int i = 0;
for (Field field : mockOriginalFieldOrder) {
@@ -508,4 +542,74 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
i++;
}
}
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSerializerSerializationFailureResilience() throws Exception{
+ PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+ // snapshot configuration and serialize to bytes
+ PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> config = pojoSerializer.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), config);
+ serializedConfig = out.toByteArray();
+ }
+
+ // mock failure when deserializing serializers
+ TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
+ mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
+ doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+ PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+ // read configuration from bytes
+ PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> deserializedConfig;
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ deserializedConfig = (PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass>)
+ TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
+ verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config, deserializedConfig);
+ }
+
+ private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+ PojoSerializer.PojoSerializerConfigSnapshot<?> original,
+ PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
+
+ LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+ original.getFieldToSerializerConfigSnapshot();
+ for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+ : deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
+
+ Assert.assertEquals(null, entry.getValue().f0);
+
+ if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
+ verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+ (PojoSerializer.PojoSerializerConfigSnapshot<?>) originalFieldSerializersAndConfs.get(entry.getKey()).f1,
+ (PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
+ } else {
+ Assert.assertEquals(originalFieldSerializersAndConfs.get(entry.getKey()).f1, entry.getValue().f1);
+ }
+ }
+
+ LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalRegistrations =
+ original.getRegisteredSubclassesToSerializerConfigSnapshots();
+
+ for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+ : deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
+
+ Assert.assertEquals(null, entry.getValue().f0);
+
+ if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
+ verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
+ (PojoSerializer.PojoSerializerConfigSnapshot<?>) originalRegistrations.get(entry.getKey()).f1,
+ (PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
+ } else {
+ Assert.assertEquals(originalRegistrations.get(entry.getKey()).f1, entry.getValue().f1);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 860c560..5a404bd 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -25,7 +25,7 @@ import com.esotericsoftware.kryo.io.Output;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.Test;
@@ -53,7 +53,7 @@ public class KryoSerializerCompatibilityTest {
TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
@@ -61,7 +61,7 @@ public class KryoSerializerCompatibilityTest {
// read configuration again from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
@@ -91,7 +91,7 @@ public class KryoSerializerCompatibilityTest {
TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+ TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
@@ -104,7 +104,7 @@ public class KryoSerializerCompatibilityTest {
// read configuration from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
- kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}