You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:31 UTC

[12/15] flink git commit: [FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot

[FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot

Prior to this commit, when reading the PojoSerializerConfigSnapshot, if
the underlying POJO type has a missing field, then the read would fail.
Failing the deserialization of the config snapshot is too severe,
because that would leave no oppurtunity to restore the checkpoint at
all, whereas we should be able to restore the config and provide it to
the new PojoSerializer for the change of getting a convert deserializer.

This commit changes this by only restoring the field names when reading
the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility,
the field name is used to lookup the fields of the new PojoSerializer.
This change does not change the serialization format of the
PojoSerializerConfigSnapshot.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c929eb30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c929eb30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c929eb30

Branch: refs/heads/master
Commit: c929eb30867bb1f539c98fe9e47f91790bd85764
Parents: 7c157d6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 12:30:58 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java  | 65 +++++++-------------
 .../typeutils/runtime/PojoSerializerTest.java   | 18 +++---
 2 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c929eb30/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 2311158..7818897 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -600,18 +600,18 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 						(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
 
 					int i = 0;
-					for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
+					for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
 							: config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 						int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
 						if (fieldIndex != -1) {
-							reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
+							reorderedFields[i] = fields[fieldIndex];
 
 							compatResult = CompatibilityUtil.resolveCompatibilityResult(
-									fieldToConfigSnapshotEntry.getValue().f0,
-									UnloadableDummyTypeSerializer.class,
-									fieldToConfigSnapshotEntry.getValue().f1,
-									fieldSerializers[fieldIndex]);
+								fieldToConfigSnapshotEntry.getValue().f0,
+								UnloadableDummyTypeSerializer.class,
+								fieldToConfigSnapshotEntry.getValue().f1,
+								fieldSerializers[fieldIndex]);
 
 							if (compatResult.isRequiresMigration()) {
 								requiresMigration = true;
@@ -745,7 +745,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		 * may reorder the fields in case they are different. The order of the fields need to
 		 * stay the same for binary compatibility, as the field order is part of the serialization format.
 		 */
-		private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
+		private LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
 
 		/**
 		 * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots.
@@ -771,7 +771,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
 				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
 				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) {
 
@@ -785,7 +785,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
 				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
 				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots,
 				boolean ignoreTypeSerializerSerialization) {
@@ -813,10 +813,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 				// --- write fields and their serializers, in order
 
 				out.writeInt(fieldToSerializerConfigSnapshot.size());
-				for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
 						: fieldToSerializerConfigSnapshot.entrySet()) {
 
-					outViewWrapper.writeUTF(entry.getKey().getName());
+					outViewWrapper.writeUTF(entry.getKey());
 
 					out.writeInt(outWithPos.getPosition());
 					if (!ignoreTypeSerializerSerialization) {
@@ -904,39 +904,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 				this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
 				String fieldName;
-				Field field;
 				TypeSerializer<?> fieldSerializer;
 				TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot;
 				for (int i = 0; i < numFields; i++) {
 					fieldName = inViewWrapper.readUTF();
 
-					// search all superclasses for the field
-					Class<?> clazz = getTypeClass();
-					field = null;
-					while (clazz != null) {
-						try {
-							field = clazz.getDeclaredField(fieldName);
-							field.setAccessible(true);
-							break;
-						} catch (NoSuchFieldException e) {
-							clazz = clazz.getSuperclass();
-						}
-					}
+					inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+					fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
 
-					if (field == null) {
-						// the field no longer exists in the POJO
-						throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
-					} else {
-						inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
-						fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
-
-						inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
-						fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+					inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+					fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
 
-						fieldToSerializerConfigSnapshot.put(
-							field,
-							new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
-					}
+					fieldToSerializerConfigSnapshot.put(
+						fieldName,
+						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
 				}
 
 				// --- read registered subclasses and their serializers, in registration order
@@ -998,7 +979,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			return VERSION;
 		}
 
-		public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
+		public LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
 			return fieldToSerializerConfigSnapshot;
 		}
 
@@ -1144,10 +1125,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	 * Finds and returns the order (0-based) of a POJO field.
 	 * Returns -1 if the field does not exist for this POJO.
 	 */
-	private int findField(Field f) {
+	private int findField(String fieldName) {
 		int foundIndex = 0;
 		for (Field field : fields) {
-			if (f.equals(field)) {
+			if (fieldName.equals(field.getName())) {
 				return foundIndex;
 			}
 
@@ -1174,12 +1155,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			TypeSerializer<?>[] fieldSerializers,
 			HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
 
-		final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
+		final LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
 			new LinkedHashMap<>(fields.length);
 
 		for (int i = 0; i < fields.length; i++) {
 			fieldToSerializerConfigSnapshots.put(
-				fields[i],
+				fields[i].getName(),
 				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c929eb30/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 10f4708..e5315aa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -484,35 +484,35 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		// creating this serializer just for generating config snapshots of the field serializers
 		PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
 			new LinkedHashMap<>(mockOriginalFieldOrder.length);
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[0],
+			mockOriginalFieldOrder[0].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[3],
 				ser.getFieldSerializers()[3].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[1],
+			mockOriginalFieldOrder[1].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[2],
 				ser.getFieldSerializers()[2].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[2],
+			mockOriginalFieldOrder[2].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[5],
 				ser.getFieldSerializers()[5].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[3],
+			mockOriginalFieldOrder[3].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[0],
 				ser.getFieldSerializers()[0].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[4],
+			mockOriginalFieldOrder[4].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[1],
 				ser.getFieldSerializers()[1].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[5],
+			mockOriginalFieldOrder[5].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[4],
 				ser.getFieldSerializers()[4].snapshotConfiguration()));
@@ -579,9 +579,9 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			PojoSerializer.PojoSerializerConfigSnapshot<?> original,
 			PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
 
-		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
 				original.getFieldToSerializerConfigSnapshot();
-		for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+		for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
 				: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 			Assert.assertEquals(null, entry.getValue().f0);