You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:03 UTC
[5/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades
for managed state
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 1a9c8f9..08da49e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -23,22 +23,25 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,22 +56,55 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
- private final Class<T> clazz;
+ // --------------------------------------------------------------------------------------------
+ // PojoSerializer parameters
+ // --------------------------------------------------------------------------------------------
- private final TypeSerializer<Object>[] fieldSerializers;
+ /** The POJO type class. */
+ private final Class<T> clazz;
+ /**
+ * Fields of the POJO and their serializers.
+ *
+ * <p>The fields are kept as a separate transient member, with their serialization
+ * handled with the {@link #readObject(ObjectInputStream)} and {@link #writeObject(ObjectOutputStream)}
+ * methods.
+ *
+ * <p>These may be reconfigured in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+ */
+ private transient Field[] fields;
+ private TypeSerializer<Object>[] fieldSerializers;
private final int numFields;
- private final Map<Class<?>, Integer> registeredClasses;
-
- private final TypeSerializer<?>[] registeredSerializers;
-
+ /**
+ * Registered subclasses and their serializers.
+ * Each subclass to their registered class tag is maintained as a separate map ordered by the class tag.
+ *
+ * <p>These may be reconfigured in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+ */
+ private LinkedHashMap<Class<?>, Integer> registeredClasses;
+ private TypeSerializer<?>[] registeredSerializers;
+
+ /**
+ * Cache of non-registered subclasses to their serializers, created on-the-fly.
+ *
+ * <p>This cache is persisted and will be repopulated with reconfigured serializers
+ * in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+ */
+ private transient HashMap<Class<?>, TypeSerializer<?>> subclassSerializerCache;
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Configuration of the current execution.
+ *
+ * <p>Nested serializers created using this will have the most up-to-date configuration,
+ * and can be resolved for backwards compatibility with previous configuration
+ * snapshots in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}.
+ */
private final ExecutionConfig executionConfig;
- private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
private transient ClassLoader cl;
- // We need to handle these ourselves in writeObject()/readObject()
- private transient Field[] fields;
@SuppressWarnings("unchecked")
public PojoSerializer(
@@ -83,93 +119,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
- LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
-
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true);
}
cl = Thread.currentThread().getContextClassLoader();
- subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
-
// We only want those classes that are not our own class and are actually sub-classes.
- List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size());
- for (Class<?> registeredClass: registeredPojoTypes) {
- if (registeredClass.equals(clazz)) {
- continue;
- }
- if (!clazz.isAssignableFrom(registeredClass)) {
- continue;
- }
- cleanedTaggedClasses.add(registeredClass);
-
- }
- this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size());
- registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
+ LinkedHashSet<Class<?>> registeredSubclasses =
+ getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig);
- int id = 0;
- for (Class<?> registeredClass: cleanedTaggedClasses) {
- this.registeredClasses.put(registeredClass, id);
- TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass);
- registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
+ this.registeredClasses = createRegisteredSubclassTags(registeredSubclasses);
+ this.registeredSerializers = createRegisteredSubclassSerializers(registeredSubclasses, executionConfig);
- id++;
- }
- }
-
- private void writeObject(ObjectOutputStream out)
- throws IOException, ClassNotFoundException {
- out.defaultWriteObject();
- out.writeInt(fields.length);
- for (Field field: fields) {
- FieldSerializer.serializeField(field, out);
- }
- }
-
- private void readObject(ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- int numFields = in.readInt();
- fields = new Field[numFields];
- for (int i = 0; i < numFields; i++) {
- fields[i] = FieldSerializer.deserializeField(in);
- }
-
- cl = Thread.currentThread().getContextClassLoader();
- subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
- }
-
- private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
- TypeSerializer<?> result = subclassSerializerCache.get(subclass);
- if (result == null) {
-
- TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass);
- result = typeInfo.createSerializer(executionConfig);
- if (result instanceof PojoSerializer) {
- PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result;
- subclassSerializer.copyBaseFieldOrder(this);
- }
- subclassSerializerCache.put(subclass, result);
-
- }
- return result;
- }
-
- @SuppressWarnings("unused")
- private boolean hasField(Field f) {
- for (Field field: fields) {
- if (f.equals(field)) {
- return true;
- }
- }
- return false;
- }
-
- private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
- // do nothing for now, but in the future, adapt subclass serializer to have same
- // ordering as base class serializer so that binary comparison on base class fields
- // can work
+ this.subclassSerializerCache = new HashMap<>();
}
@Override
@@ -296,7 +259,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
}
} catch (IllegalAccessException e) {
- throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
+ throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
}
} else {
TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
@@ -341,13 +304,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
target.writeByte(flags);
+ // if its a registered subclass, write the class tag id, otherwise write the full classname
if ((flags & IS_SUBCLASS) != 0) {
target.writeUTF(actualClass.getName());
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
target.writeByte(subclassTag);
}
-
+ // if its a subclass, use the corresponding subclass serializer,
+ // otherwise serialize each field with our field serializers
if ((flags & NO_SUBCLASS) != 0) {
try {
for (int i = 0; i < numFields; i++) {
@@ -360,8 +325,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
}
} catch (IllegalAccessException e) {
- throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-
+ throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
}
} else {
// subclass
@@ -418,8 +382,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
}
} catch (IllegalAccessException e) {
- throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before.");
-
+ throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
}
} else {
if (subclassSerializer != null) {
@@ -493,8 +456,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
}
} catch (IllegalAccessException e) {
- throw new RuntimeException(
- "Error during POJO copy, this should not happen since we check the fields before.");
+ throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e);
}
} else {
if (subclassSerializer != null) {
@@ -574,4 +536,527 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
public boolean canEqual(Object obj) {
return obj instanceof PojoSerializer;
}
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public PojoSerializerConfigSnapshot<T> snapshotConfiguration() {
+ return buildConfigSnapshot(
+ clazz,
+ registeredClasses,
+ registeredSerializers,
+ fields,
+ fieldSerializers,
+ subclassSerializerCache);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
+ final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot;
+
+ if (clazz.equals(config.getTypeClass())) {
+ if (this.numFields == config.getFieldToSerializerConfigSnapshot().size()) {
+
+ CompatibilityResult<?> compatResult;
+
+ // ----------- check field order and compatibility of field serializers -----------
+
+ // reordered fields and their serializers;
+ // this won't be applied to this serializer until all compatibility checks have been completed
+ final Field[] reorderedFields = new Field[this.numFields];
+ final TypeSerializer<Object>[] reorderedFieldSerializers =
+ (TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
+
+ int i = 0;
+ for (Map.Entry<Field, TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry
+ : config.getFieldToSerializerConfigSnapshot().entrySet()) {
+
+ int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
+ if (fieldIndex != -1) {
+ reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
+
+ compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
+ if (compatResult.requiresMigration()) {
+ return CompatibilityResult.requiresMigration(null);
+ } else {
+ reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
+ }
+ } else {
+ return CompatibilityResult.requiresMigration(null);
+ }
+
+ i++;
+ }
+
+ // ---- check subclass registration order and compatibility of registered serializers ----
+
+ // reordered subclass registrations and their serializers;
+ // this won't be applied to this serializer until all compatibility checks have been completed
+ final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags;
+ final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers;
+
+ final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> previousRegistrations =
+ config.getRegisteredSubclassesToSerializerConfigSnapshots();
+
+ // the reconfigured list of registered subclasses will be the previous registered
+ // subclasses in the original order with new subclasses appended at the end
+ LinkedHashSet<Class<?>> reorderedRegisteredSubclasses = new LinkedHashSet<>();
+ reorderedRegisteredSubclasses.addAll(previousRegistrations.keySet());
+ reorderedRegisteredSubclasses.addAll(
+ getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig));
+
+ // re-establish the registered class tags and serializers
+ reorderedRegisteredSubclassesToClasstags = createRegisteredSubclassTags(reorderedRegisteredSubclasses);
+ reorderedRegisteredSubclassSerializers = createRegisteredSubclassSerializers(
+ reorderedRegisteredSubclasses, executionConfig);
+
+ i = 0;
+ for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) {
+ // check compatibility of subclass serializer
+ compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
+ if (compatResult.requiresMigration()) {
+ return CompatibilityResult.requiresMigration(null);
+ }
+
+ i++;
+ }
+
+ // ------------------ ensure compatibility of non-registered subclass serializers ------------------
+
+ // the rebuilt cache for non-registered subclass serializers;
+ // this won't be applied to this serializer until all compatibility checks have been completed
+ HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>();
+
+ for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> previousCachedEntry
+ : config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
+
+ TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey());
+
+ // check compatibility of cached subclass serializer
+ compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
+ if (compatResult.requiresMigration()) {
+ return CompatibilityResult.requiresMigration(null);
+ } else {
+ rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
+ }
+ }
+
+ // completed compatibility checks; up to this point, we can just reconfigure
+ // the serializer so that it is compatible and migration is not required
+
+ this.fields = reorderedFields;
+ this.fieldSerializers = reorderedFieldSerializers;
+
+ this.registeredClasses = reorderedRegisteredSubclassesToClasstags;
+ this.registeredSerializers = reorderedRegisteredSubclassSerializers;
+
+ this.subclassSerializerCache = rebuiltCache;
+
+ return CompatibilityResult.compatible();
+ }
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
+
+ public static final class PojoSerializerConfigSnapshot<T> extends GenericTypeSerializerConfigSnapshot<T> {
+
+ private static final int VERSION = 1;
+
+ /**
+ * Ordered map of POJO fields to the configuration snapshots of their corresponding serializers.
+ *
+ * <p>Ordering of the fields is kept so that new Pojo serializers for previous data
+ * may reorder the fields in case they are different. The order of the fields need to
+ * stay the same for binary compatibility, as the field order is part of the serialization format.
+ */
+ private LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot;
+
+ /**
+ * Ordered map of registered subclasses to the configuration snapshots of their corresponding serializers.
+ *
+ * <p>Ordering of the registered subclasses is kept so that new Pojo serializers for previous data
+ * may retain the same class tag used for registered subclasses. Newly registered subclasses that
+ * weren't present before should be appended with the next available class tag.
+ */
+ private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots;
+
+ /**
+ * Configuration snapshots of previously cached non-registered subclass serializers.
+ *
+ * <p>This is kept so that new Pojo serializers may eagerly repopulate their
+ * cache with reconfigured subclass serializers.
+ */
+ private HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public PojoSerializerConfigSnapshot() {}
+
+ public PojoSerializerConfigSnapshot(
+ Class<T> pojoType,
+ LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot,
+ LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots,
+ HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots) {
+
+ super(pojoType);
+
+ this.fieldToSerializerConfigSnapshot =
+ Preconditions.checkNotNull(fieldToSerializerConfigSnapshot);
+ this.registeredSubclassesToSerializerConfigSnapshots =
+ Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots);
+ this.nonRegisteredSubclassesToSerializerConfigSnapshots =
+ Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots);
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ // --- write fields and their serializers, in order
+
+ out.writeInt(fieldToSerializerConfigSnapshot.size());
+ for (Map.Entry<Field, TypeSerializerConfigSnapshot> entry
+ : fieldToSerializerConfigSnapshot.entrySet()) {
+ out.writeUTF(entry.getKey().getName());
+ TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+ }
+
+ // --- write registered subclasses and their serializers, in registration order
+
+ out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size());
+ for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
+ : registeredSubclassesToSerializerConfigSnapshots.entrySet()) {
+ out.writeUTF(entry.getKey().getName());
+ TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+ }
+
+ // --- write snapshot of non-registered subclass serializer cache
+
+ out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size());
+ for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry
+ : nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) {
+ out.writeUTF(entry.getKey().getName());
+ TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue());
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ // --- read fields and their serializers, in order
+
+ int numFields = in.readInt();
+ this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
+ String fieldName;
+ Field field;
+ for (int i = 0; i < numFields; i++) {
+ fieldName = in.readUTF();
+
+ // search all superclasses for the field
+ Class<?> clazz = getTypeClass();
+ field = null;
+ while (clazz != null) {
+ try {
+ field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ break;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+
+ if (field == null) {
+ // the field no longer exists in the POJO
+ throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
+ } else {
+ fieldToSerializerConfigSnapshot.put(
+ field,
+ TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+ }
+ }
+
+ // --- read registered subclasses and their serializers, in registration order
+
+ int numRegisteredSubclasses = in.readInt();
+ this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses);
+ String registeredSubclassname;
+ Class<?> registeredSubclass;
+ for (int i = 0; i < numRegisteredSubclasses; i++) {
+ registeredSubclassname = in.readUTF();
+ try {
+ registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e);
+ }
+
+ this.registeredSubclassesToSerializerConfigSnapshots.put(
+ registeredSubclass,
+ TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+ }
+
+ // --- read snapshot of non-registered subclass serializer cache
+
+ int numCachedSubclassSerializers = in.readInt();
+ this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers);
+ String cachedSubclassname;
+ Class<?> cachedSubclass;
+ for (int i = 0; i < numCachedSubclassSerializers; i++) {
+ cachedSubclassname = in.readUTF();
+ try {
+ cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e);
+ }
+
+ this.nonRegisteredSubclassesToSerializerConfigSnapshots.put(
+ cachedSubclass,
+ TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader()));
+ }
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ public LinkedHashMap<Field, TypeSerializerConfigSnapshot> getFieldToSerializerConfigSnapshot() {
+ return fieldToSerializerConfigSnapshot;
+ }
+
+ public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> getRegisteredSubclassesToSerializerConfigSnapshots() {
+ return registeredSubclassesToSerializerConfigSnapshots;
+ }
+
+ public HashMap<Class<?>, TypeSerializerConfigSnapshot> getNonRegisteredSubclassesToSerializerConfigSnapshots() {
+ return nonRegisteredSubclassesToSerializerConfigSnapshots;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj)
+ && (obj instanceof PojoSerializerConfigSnapshot)
+ && fieldToSerializerConfigSnapshot.equals(((PojoSerializerConfigSnapshot) obj).getFieldToSerializerConfigSnapshot())
+ && registeredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot) obj).getRegisteredSubclassesToSerializerConfigSnapshots())
+ && nonRegisteredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot) obj).nonRegisteredSubclassesToSerializerConfigSnapshots);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode()
+ + Objects.hash(
+ fieldToSerializerConfigSnapshot,
+ registeredSubclassesToSerializerConfigSnapshots,
+ nonRegisteredSubclassesToSerializerConfigSnapshots);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out)
+ throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ out.writeInt(fields.length);
+ for (Field field: fields) {
+ FieldSerializer.serializeField(field, out);
+ }
+ }
+
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ int numFields = in.readInt();
+ fields = new Field[numFields];
+ for (int i = 0; i < numFields; i++) {
+ fields[i] = FieldSerializer.deserializeField(in);
+ }
+
+ cl = Thread.currentThread().getContextClassLoader();
+ subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Extracts the subclasses of the base POJO class registered in the execution config.
+ */
+ private static LinkedHashSet<Class<?>> getRegisteredSubclassesFromExecutionConfig(
+ Class<?> basePojoClass,
+ ExecutionConfig executionConfig) {
+
+ LinkedHashSet<Class<?>> subclassesInRegistrationOrder = new LinkedHashSet<>(executionConfig.getRegisteredPojoTypes().size());
+ for (Class<?> registeredClass : executionConfig.getRegisteredPojoTypes()) {
+ if (registeredClass.equals(basePojoClass)) {
+ continue;
+ }
+ if (!basePojoClass.isAssignableFrom(registeredClass)) {
+ continue;
+ }
+ subclassesInRegistrationOrder.add(registeredClass);
+ }
+
+ return subclassesInRegistrationOrder;
+ }
+
+ /**
+ * Builds map of registered subclasses to their class tags.
+ * Class tags will be integers starting from 0, assigned incrementally with the order of provided subclasses.
+ */
+ private static LinkedHashMap<Class<?>, Integer> createRegisteredSubclassTags(LinkedHashSet<Class<?>> registeredSubclasses) {
+ final LinkedHashMap<Class<?>, Integer> classToTag = new LinkedHashMap<>();
+
+ int id = 0;
+ for (Class<?> registeredClass : registeredSubclasses) {
+ classToTag.put(registeredClass, id);
+ id ++;
+ }
+
+ return classToTag;
+ }
+
+ /**
+ * Creates an array of serializers for provided list of registered subclasses.
+ * Order of returned serializers will correspond to order of provided subclasses.
+ */
+ private static TypeSerializer<?>[] createRegisteredSubclassSerializers(
+ LinkedHashSet<Class<?>> registeredSubclasses,
+ ExecutionConfig executionConfig) {
+
+ final TypeSerializer<?>[] subclassSerializers = new TypeSerializer[registeredSubclasses.size()];
+
+ int i = 0;
+ for (Class<?> registeredClass : registeredSubclasses) {
+ subclassSerializers[i] = TypeExtractor.createTypeInfo(registeredClass).createSerializer(executionConfig);
+ i++;
+ }
+
+ return subclassSerializers;
+ }
+
+ /**
+ * Fetches cached serializer for a non-registered subclass;
+ * also creates the serializer if it doesn't exist yet.
+ *
+ * This method is also exposed to package-private access
+ * for testing purposes.
+ */
+ TypeSerializer<?> getSubclassSerializer(Class<?> subclass) {
+ TypeSerializer<?> result = subclassSerializerCache.get(subclass);
+ if (result == null) {
+ result = createSubclassSerializer(subclass);
+ subclassSerializerCache.put(subclass, result);
+ }
+ return result;
+ }
+
+ private TypeSerializer<?> createSubclassSerializer(Class<?> subclass) {
+ TypeSerializer<?> serializer = TypeExtractor.createTypeInfo(subclass).createSerializer(executionConfig);
+
+ if (serializer instanceof PojoSerializer) {
+ PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) serializer;
+ subclassSerializer.copyBaseFieldOrder(this);
+ }
+
+ return serializer;
+ }
+
+ /**
+ * Finds and returns the order (0-based) of a POJO field.
+ * Returns -1 if the field does not exist for this POJO.
+ */
+ private int findField(Field f) {
+ int foundIndex = 0;
+ for (Field field : fields) {
+ if (f.equals(field)) {
+ return foundIndex;
+ }
+
+ foundIndex++;
+ }
+
+ return -1;
+ }
+
+ private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
+ // do nothing for now, but in the future, adapt subclass serializer to have same
+ // ordering as base class serializer so that binary comparison on base class fields
+ // can work
+ }
+
+ /**
+ * Build and return a snapshot of the serializer's parameters and currently cached serializers.
+ */
+ private static <T> PojoSerializerConfigSnapshot<T> buildConfigSnapshot(
+ Class<T> pojoType,
+ LinkedHashMap<Class<?>, Integer> registeredSubclassesToTags,
+ TypeSerializer<?>[] registeredSubclassSerializers,
+ Field[] fields,
+ TypeSerializer<?>[] fieldSerializers,
+ HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
+
+ final LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshots =
+ new LinkedHashMap<>(fields.length);
+
+ for (int i = 0; i < fields.length; i++) {
+ fieldToSerializerConfigSnapshots.put(fields[i], fieldSerializers[i].snapshotConfiguration());
+ }
+
+ final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots =
+ new LinkedHashMap<>(registeredSubclassesToTags.size());
+
+ for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) {
+ registeredSubclassesToSerializerConfigSnapshots.put(
+ entry.getKey(),
+ registeredSubclassSerializers[entry.getValue()].snapshotConfiguration());
+ }
+
+ final HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots =
+ new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size());
+
+ for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) {
+ nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), entry.getValue().snapshotConfiguration());
+ }
+
+ return new PojoSerializerConfigSnapshot<>(
+ pojoType,
+ fieldToSerializerConfigSnapshots,
+ registeredSubclassesToSerializerConfigSnapshots,
+ nonRegisteredSubclassesToSerializerConfigSnapshots);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test utilities
+ // --------------------------------------------------------------------------------------------
+
+ @VisibleForTesting
+ Field[] getFields() {
+ return fields;
+ }
+
+ @VisibleForTesting
+ TypeSerializer<?>[] getFieldSerializers() {
+ return fieldSerializers;
+ }
+
+ @VisibleForTesting
+ LinkedHashMap<Class<?>, Integer> getRegisteredClasses() {
+ return registeredClasses;
+ }
+
+ @VisibleForTesting
+ TypeSerializer<?>[] getRegisteredSerializers() {
+ return registeredSerializers;
+ }
+
+ @VisibleForTesting
+ HashMap<Class<?>, TypeSerializer<?>> getSubclassSerializerCache() {
+ return subclassSerializerCache;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index dbd5d3a..5770dac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -18,12 +18,17 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Row;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.Arrays;
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
@@ -35,12 +40,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Serializer for {@link Row}.
*/
@Internal
-public class RowSerializer extends TypeSerializer<Row> {
+public final class RowSerializer extends TypeSerializer<Row> {
private static final long serialVersionUID = 1L;
- private final boolean[] nullMask;
+
private final TypeSerializer<Object>[] fieldSerializers;
+ private transient boolean[] nullMask;
+
+ @SuppressWarnings("unchecked")
public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.nullMask = new boolean[fieldSerializers.length];
@@ -231,4 +239,73 @@ public class RowSerializer extends TypeSerializer<Row> {
public int hashCode() {
return Arrays.hashCode(fieldSerializers);
}
+
+ // --------------------------------------------------------------------------------------------
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.nullMask = new boolean[fieldSerializers.length];
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public RowSerializerConfigSnapshot snapshotConfiguration() {
+ return new RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+ }
+
+ @Override
+ public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof RowSerializerConfigSnapshot) {
+ TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
+ ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+ if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
+ boolean requireMigration = false;
+ TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length];
+
+ CompatibilityResult<?> compatResult;
+ for (int i = 0; i < fieldSerializers.length; i++) {
+ compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+ if (compatResult.requiresMigration()) {
+ requireMigration = true;
+
+ if (compatResult.getConvertDeserializer() == null) {
+ // one of the field serializers cannot provide a fallback deserializer
+ return CompatibilityResult.requiresMigration(null);
+ } else {
+ convertDeserializers[i] = compatResult.getConvertDeserializer();
+ }
+ }
+ }
+
+ if (requireMigration) {
+ return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers));
+ } else {
+ return CompatibilityResult.compatible();
+ }
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
+
+ public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+
+ private static final int VERSION = 1;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public RowSerializerConfigSnapshot() {}
+
+ public RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
+ super(fieldSerializerConfigSnapshots);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index afc4aa2..68d5aa8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -19,7 +19,10 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -115,4 +118,43 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
public boolean canEqual(Object obj) {
return obj instanceof TupleSerializerBase;
}
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TupleSerializerConfigSnapshot<T> snapshotConfiguration() {
+ return new TupleSerializerConfigSnapshot<>(
+ tupleClass,
+ TypeSerializerUtil.snapshotConfigurations(fieldSerializers));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof TupleSerializerConfigSnapshot) {
+ final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot;
+
+ if (tupleClass.equals(config.getTupleClass())) {
+ TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots =
+ ((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+ if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) {
+
+ CompatibilityResult compatResult;
+ for (int i = 0; i < fieldSerializers.length; i++) {
+ compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
+ if (compatResult.requiresMigration()) {
+ return CompatibilityResult.requiresMigration(null);
+ }
+ }
+
+ return CompatibilityResult.compatible();
+ }
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
new file mode 100644
index 0000000..6d2bb5f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Snapshot of a tuple serializer's configuration.
+ */
+@Internal
+public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+
+ private static final int VERSION = 1;
+
+ private Class<T> tupleClass;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public TupleSerializerConfigSnapshot() {}
+
+ public TupleSerializerConfigSnapshot(
+ Class<T> tupleClass,
+ TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) {
+
+ super(fieldSerializerConfigSnapshots);
+
+ this.tupleClass = Preconditions.checkNotNull(tupleClass);
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ InstantiationUtil.serializeObject(new DataOutputViewStream(out), tupleClass);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ try {
+ tupleClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not find requested tuple class in classpath.", e);
+ }
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ public Class<T> getTupleClass() {
+ return tupleClass;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj)
+ && (obj instanceof TupleSerializerConfigSnapshot)
+ && (tupleClass.equals(((TupleSerializerConfigSnapshot) obj).getTupleClass()));
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() * 31 + tupleClass.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 56e204c..10e2330 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -19,15 +19,20 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.util.Preconditions;
import org.objenesis.strategy.StdInstantiatorStrategy;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,12 +44,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* @param <T> The type serialized.
*/
@Internal
-public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
+public final class ValueSerializer<T extends Value> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
private final Class<T> type;
-
+
+ /**
+ * Map of class tag (using classname as tag) to their Kryo registration.
+ *
+ * <p>This map serves as a preview of the final registration result of
+ * the Kryo instance, taking into account registration overwrites.
+ *
+ * <p>Currently, we only have one single registration for the value type.
+ * Nevertheless, we keep this information here for future compatibility.
+ */
+ private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
private transient Kryo kryo;
private transient T copyInstance;
@@ -53,6 +69,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
public ValueSerializer(Class<T> type) {
this.type = checkNotNull(type);
+ this.kryoRegistrations = asKryoRegistrations(type);
}
// --------------------------------------------------------------------------------------------
@@ -126,7 +143,8 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
kryo.setInstantiatorStrategy(instantiatorStrategy);
this.kryo.setAsmEnabled(true);
- this.kryo.register(type);
+
+ KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
}
}
@@ -152,4 +170,66 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
public boolean canEqual(Object obj) {
return obj instanceof ValueSerializer;
}
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public ValueSerializerConfigSnapshot<T> snapshotConfiguration() {
+ return new ValueSerializerConfigSnapshot<>(type);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof ValueSerializerConfigSnapshot) {
+ final ValueSerializerConfigSnapshot<T> config = (ValueSerializerConfigSnapshot<T>) configSnapshot;
+
+ if (type.equals(config.getTypeClass())) {
+ // currently, simply checking the type of the value class is sufficient;
+ // in the future, if there are more Kryo registrations, we should try to resolve that
+ return CompatibilityResult.compatible();
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
+
+ public static class ValueSerializerConfigSnapshot<T extends Value> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+ private static final int VERSION = 1;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public ValueSerializerConfigSnapshot() {}
+
+ public ValueSerializerConfigSnapshot(Class<T> valueTypeClass) {
+ super(valueTypeClass, asKryoRegistrations(valueTypeClass));
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // kryoRegistrations may be null if this value serializer is deserialized from an old version
+ if (kryoRegistrations == null) {
+ this.kryoRegistrations = asKryoRegistrations(type);
+ }
+ }
+
+ private static LinkedHashMap<String, KryoRegistration> asKryoRegistrations(Class<?> type) {
+ Preconditions.checkNotNull(type);
+
+ LinkedHashMap<String, KryoRegistration> registration = new LinkedHashMap<>(1);
+ registration.put(type.getClass().getName(), new KryoRegistration(type));
+
+ return registration;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index cba0c84..a172b72 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -21,18 +21,22 @@ package org.apache.flink.api.java.typeutils.runtime.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.generic.GenericData;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -45,6 +49,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
@@ -72,11 +77,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
// ------------------------------------------------------------------------
- private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
- private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
- private final LinkedHashSet<Class<?>> registeredTypes;
+
+ /**
+ * Map of class tag (using classname as tag) to their Kryo registration.
+ *
+ * <p>This map serves as a preview of the final registration result of
+ * the Kryo instance, taking into account registration overwrites.
+ */
+ private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
private final Class<T> type;
@@ -93,26 +103,35 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private transient Output output;
// ------------------------------------------------------------------------
+ // legacy fields; these fields cannot yet be removed to retain backwards compatibility
+
+ private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
+ private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
+ private LinkedHashSet<Class<?>> registeredTypes;
+
+ // ------------------------------------------------------------------------
public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = checkNotNull(type);
this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
- this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
- this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
- this.registeredTypes = executionConfig.getRegisteredKryoTypes();
+
+ this.kryoRegistrations = buildKryoRegistrations(
+ this.type,
+ executionConfig.getRegisteredKryoTypes(),
+ executionConfig.getRegisteredTypesWithKryoSerializerClasses(),
+ executionConfig.getRegisteredTypesWithKryoSerializers());
}
/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
- registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
- registeredTypes = toCopy.registeredTypes;
+
+ kryoRegistrations = toCopy.kryoRegistrations;
type = toCopy.type;
if(type == null){
@@ -272,11 +291,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
- // we cannot include the Serializers here because they don't implement the equals method
return other.canEqual(this) &&
type == other.type &&
- registeredTypes.equals(other.registeredTypes) &&
- registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
+ kryoRegistrations.equals(other.kryoRegistrations) &&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
@@ -334,7 +351,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
// This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details.
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
- // Add default serializers first, so that they type registrations without a serializer
+ // Add default serializers first, so that the type registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
@@ -344,59 +361,152 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}
- // register the type of our class
- kryo.register(type);
+ KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
- // register given types. we do this first so that any registration of a
- // more specific serializer overrides this
- for (Class<?> type : registeredTypes) {
- kryo.register(type);
- }
+ kryo.setRegistrationRequired(false);
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+ }
- // register given serializer classes
- for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
- Class<?> typeClass = e.getKey();
- Class<? extends Serializer<?>> serializerClass = e.getValue();
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
- Serializer<?> serializer =
- ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
- kryo.register(typeClass, serializer);
- }
+ @Override
+ public KryoSerializerConfigSnapshot<T> snapshotConfiguration() {
+ return new KryoSerializerConfigSnapshot<>(type, kryoRegistrations);
+ }
- // register given serializers
- for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
- kryo.register(e.getKey(), e.getValue().getSerializer());
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof KryoSerializerConfigSnapshot) {
+ final KryoSerializerConfigSnapshot<T> config = (KryoSerializerConfigSnapshot<T>) configSnapshot;
+
+ if (type.equals(config.getTypeClass())) {
+ LinkedHashMap<String, KryoRegistration> reconfiguredRegistrations = config.getKryoRegistrations();
+
+ // reconfigure by assuring that classes which were previously registered are registered
+ // again in the exact same order; new class registrations will be appended.
+ // this also overwrites any dummy placeholders that the restored old configuration has
+ reconfiguredRegistrations.putAll(kryoRegistrations);
+
+ // check if there is still any dummy placeholders even after reconfiguration;
+ // if so, then this new Kryo serializer cannot read old data and is therefore incompatible
+ for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : reconfiguredRegistrations.entrySet()) {
+ if (reconfiguredRegistrationEntry.getValue().isDummy()) {
+ LOG.warn("The Kryo registration for a previously registered class {} does not have a " +
+ "proper serializer, because its previous serializer cannot be loaded or is no " +
+ "longer valid but a new serializer is not available", reconfiguredRegistrationEntry.getKey());
+
+ return CompatibilityResult.requiresMigration(null);
+ }
+ }
+
+ // there's actually no way to tell if new Kryo serializers are compatible with
+ // the previous ones they overwrite; we can only signal compatibly and hope for the best
+ this.kryoRegistrations = reconfiguredRegistrations;
+ return CompatibilityResult.compatible();
}
- // this is needed for Avro but can not be added on demand.
- kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());
+ }
- kryo.setRegistrationRequired(false);
- kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ return CompatibilityResult.requiresMigration(null);
+ }
+
+ public static final class KryoSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+ private static final int VERSION = 1;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public KryoSerializerConfigSnapshot() {}
+
+ public KryoSerializerConfigSnapshot(
+ Class<T> typeClass,
+ LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+
+ super(typeClass, kryoRegistrations);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
}
}
// --------------------------------------------------------------------------------------------
- // For testing
+ // Utilities
// --------------------------------------------------------------------------------------------
-
- public Kryo getKryo() {
- checkKryoInitialized();
- return this.kryo;
+
+ /**
+ * Utility method that takes lists of registered types and their serializers, and resolve
+ * them into a single list such that the result will resemble the final registration
+ * result in Kryo.
+ */
+ private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(
+ Class<?> serializedType,
+ LinkedHashSet<Class<?>> registeredTypes,
+ LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses,
+ LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers) {
+
+ final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>();
+
+ kryoRegistrations.put(serializedType.getName(), new KryoRegistration(serializedType));
+
+ for (Class<?> registeredType : checkNotNull(registeredTypes)) {
+ kryoRegistrations.put(registeredType.getName(), new KryoRegistration(registeredType));
+ }
+
+ for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> registeredTypeWithSerializerClassEntry :
+ checkNotNull(registeredTypesWithSerializerClasses).entrySet()) {
+
+ kryoRegistrations.put(
+ registeredTypeWithSerializerClassEntry.getKey().getName(),
+ new KryoRegistration(
+ registeredTypeWithSerializerClassEntry.getKey(),
+ registeredTypeWithSerializerClassEntry.getValue()));
+ }
+
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypeWithSerializerEntry :
+ checkNotNull(registeredTypesWithSerializers).entrySet()) {
+
+ kryoRegistrations.put(
+ registeredTypeWithSerializerEntry.getKey().getName(),
+ new KryoRegistration(
+ registeredTypeWithSerializerEntry.getKey(),
+ registeredTypeWithSerializerEntry.getValue()));
+ }
+
+ kryoRegistrations.put(
+ GenericData.Array.class.getName(),
+ new KryoRegistration(
+ GenericData.Array.class,
+ new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+
+ return kryoRegistrations;
}
- @Override
- public boolean canRestoreFrom(TypeSerializer<?> other) {
- if (other instanceof KryoSerializer) {
- KryoSerializer<?> otherKryo = (KryoSerializer<?>) other;
+ // --------------------------------------------------------------------------------------------
- // we cannot include the Serializers here because they don't implement the equals method
- return other.canEqual(this) &&
- type == otherKryo.type &&
- (registeredTypes.equals(otherKryo.registeredTypes) || otherKryo.registeredTypes.isEmpty()) &&
- (registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses) || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) &&
- (defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || otherKryo.defaultSerializerClasses.isEmpty());
- } else {
- return false;
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // kryoRegistrations may be null if this Kryo serializer is deserialized from an old version
+ if (kryoRegistrations == null) {
+ this.kryoRegistrations = buildKryoRegistrations(
+ type,
+ registeredTypes,
+ registeredTypesWithSerializerClasses,
+ registeredTypesWithSerializers);
}
}
+
+ // --------------------------------------------------------------------------------------------
+ // For testing
+ // --------------------------------------------------------------------------------------------
+
+ @VisibleForTesting
+ public Kryo getKryo() {
+ checkKryoInitialized();
+ return this.kryo;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 91c6145..a846703 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -31,6 +32,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -85,6 +89,38 @@ public abstract class SerializerTestBase<T> extends TestLogger {
fail("Exception in test: " + e.getMessage());
}
}
+
+ @Test
+ public void testConfigSnapshotInstantiation() {
+ TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
+
+ InstantiationUtil.instantiate(configSnapshot.getClass());
+ }
+
+ @Test
+ public void testSnapshotConfigurationAndReconfigure() throws Exception {
+ final TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
+
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(
+ new DataOutputViewStreamWrapper(out), configSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ TypeSerializerConfigSnapshot restoredConfig;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
+ assertFalse(strategy.requiresMigration());
+
+ // also verify that the serializer's reconfigure implementation detects incompatibility
+ strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
+ assertTrue(strategy.requiresMigration());
+ }
@Test
public void testGetLength() {
@@ -477,4 +513,21 @@ public abstract class SerializerTestBase<T> extends TestLogger {
}
}
}
+
+ public static final class TestIncompatibleSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof TestIncompatibleSerializerConfigSnapshot;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
new file mode 100644
index 0000000..0783bb6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests related to {@link TypeSerializerConfigSnapshot}.
+ */
+public class TypeSerializerConfigSnapshotTest {
+
+ /**
+ * Verifies that reading and writing configuration snapshots work correctly.
+ */
+ @Test
+ public void testSerializeConfigurationSnapshots() throws Exception {
+ TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo");
+ TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar");
+
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshots(
+ new DataOutputViewStreamWrapper(out),
+ configSnapshot1,
+ configSnapshot2);
+
+ serializedConfig = out.toByteArray();
+ }
+
+ TypeSerializerConfigSnapshot[] restoredConfigs;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ assertEquals(2, restoredConfigs.length);
+ assertEquals(configSnapshot1, restoredConfigs[0]);
+ assertEquals(configSnapshot2, restoredConfigs[1]);
+ }
+
+ /**
+ * Verifies that deserializing config snapshots fail if the config class could not be found.
+ */
+ @Test
+ public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(
+ new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar"));
+ serializedConfig = out.toByteArray();
+ }
+
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ // read using a dummy classloader
+ TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
+ fail("Expected a ClassNotFoundException wrapped in IOException");
+ } catch (IOException expected) {
+ // test passes
+ }
+ }
+
+ public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
+
+ static final int VERSION = 1;
+
+ private int val;
+ private String msg;
+
+ public TestConfigSnapshot() {}
+
+ public TestConfigSnapshot(int val, String msg) {
+ this.val = val;
+ this.msg = msg;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ out.writeInt(val);
+ out.writeUTF(msg);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+ val = in.readInt();
+ msg = in.readUTF();
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (obj instanceof TestConfigSnapshot) {
+ return val == ((TestConfigSnapshot) obj).val && msg.equals(((TestConfigSnapshot) obj).msg);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * val + msg.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 5e2e733..5c615de 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -18,11 +18,25 @@
package org.apache.flink.api.common.typeutils.base;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class EnumSerializerTest extends TestLogger {
@Test
@@ -41,6 +55,132 @@ public class EnumSerializerTest extends TestLogger {
new EnumSerializer<>(EmptyEnum.class);
}
+ @Test
+ public void testReconfiguration() {
+ // mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
+ PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
+
+ // now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
+ EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+ // verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
+ assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+ assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+ assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+ assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+ assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+ assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+
+ // reconfigure and verify compatibility
+ CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
+ new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
+ assertFalse(compatResult.requiresMigration());
+
+ // after reconfiguration, the order should be first the original BAR, PAULA, NATHANIEL,
+ // followed by the "new enum constants" FOO, PETER, EMMA
+ PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
+
+ int i = 0;
+ for (PublicEnum constant : expectedOrder) {
+ assertEquals(i, serializer.getValueToOrdinal().get(constant).intValue());
+ i++;
+ }
+
+ assertTrue(Arrays.equals(expectedOrder, serializer.getValues()));
+ }
+
+ @Test
+ public void testConfigurationSnapshotSerialization() throws Exception {
+ EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(
+ new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration());
+ serializedConfig = out.toByteArray();
+ }
+
+ TypeSerializerConfigSnapshot restoredConfig;
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig);
+ assertFalse(compatResult.requiresMigration());
+
+ assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+ assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+ assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+ assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+ assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+ assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+ assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues()));
+ }
+
+ @Test
+ public void testSerializeEnumSerializer() throws Exception {
+ EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+ // verify original transient parameters
+ assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+ assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+ assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+ assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+ assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+ assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+ assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues()));
+
+ byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer);
+
+ // deserialize and re-verify transient parameters
+ serializer = InstantiationUtil.deserializeObject(serializedSerializer, Thread.currentThread().getContextClassLoader());
+ assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+ assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+ assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+ assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+ assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+ assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+ assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues()));
+ }
+
+ @Test
+ public void testSerializeReconfiguredEnumSerializer() throws Exception {
+ // mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL
+ PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL};
+
+ // now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order"
+ EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class);
+
+ // verify that the serializer is first using the "wrong order" (i.e., the initial new configuration)
+ assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
+ assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
+ assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue());
+ assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue());
+ assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue());
+ assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue());
+
+ // reconfigure and verify compatibility
+ CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
+ new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
+ assertFalse(compatResult.requiresMigration());
+
+ // serialize and deserialize again the serializer
+ byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer);
+ serializer = InstantiationUtil.deserializeObject(serializedSerializer, Thread.currentThread().getContextClassLoader());
+
+ // verify that after the serializer was read, the reconfigured constant ordering is untouched
+ PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA};
+
+ int i = 0;
+ for (PublicEnum constant : expectedOrder) {
+ assertEquals(i, serializer.getValueToOrdinal().get(constant).intValue());
+ i++;
+ }
+
+ assertTrue(Arrays.equals(expectedOrder, serializer.getValues()));
+ }
+
@SafeVarargs
public final <T extends Enum<T>> void testEnumSerializer(T... data) {
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
index 9fda3d0..3301aa2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java
@@ -20,8 +20,6 @@ package org.apache.flink.api.common.typeutils.base.array;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
/**
* A test for the {@link LongPrimitiveArraySerializer}.