You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:30 UTC

[11/15] flink git commit: [FLINK-6801] [core] Allow deserialized PojoSerializer to have removed fields

[FLINK-6801] [core] Allow deserialized PojoSerializer to have removed fields

Prior to this commit, deserializing the PojoSerializer would fail when
we encounter a missing field that existed in the POJO type before. It is
actually perfectly fine to have a missing field; the deserialized
PojoSerializer should simply skip reading the removed field's previously
serialized values, i.e. much like how Java Object Serialization works.

This commit relaxes the deserialization of the PojoSerializer, so that a
null will be used as a placeholder value to indicate a removed field
that previously existed. De-/serialization and copying methods on the
PojoSerializer will respect null Fields and simply skip them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae285f9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae285f9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae285f9b

Branch: refs/heads/master
Commit: ae285f9bd5398fe4d8d86eb3207bbc5beb8a24c8
Parents: c929eb3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 20:41:59 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/FieldSerializer.java |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  | 109 +++++++++++--------
 2 files changed, 64 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae285f9b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
index 56a4445..5d23b91 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
@@ -51,7 +51,7 @@ public class FieldSerializer {
 				clazz = clazz.getSuperclass();
 			}
 		}
-		throw new IOException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-				+ " (" + fieldName + ")");
+
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae285f9b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7818897..6a67428 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -204,10 +204,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	protected void initializeFields(T t) {
 		for (int i = 0; i < numFields; i++) {
-			try {
-				fields[i].set(t, fieldSerializers[i].createInstance());
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Cannot initialize fields.", e);
+			if (fields[i] != null) {
+				try {
+					fields[i].set(t, fieldSerializers[i].createInstance());
+				} catch (IllegalAccessException e) {
+					throw new RuntimeException("Cannot initialize fields.", e);
+				}
 			}
 		}
 	}
@@ -231,13 +233,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			// no subclass
 			try {
 				for (int i = 0; i < numFields; i++) {
-					Object value = fields[i].get(from);
-					if (value != null) {
-						Object copy = fieldSerializers[i].copy(value);
-						fields[i].set(target, copy);
-					}
-					else {
-						fields[i].set(target, null);
+					if (fields[i] != null) {
+						Object value = fields[i].get(from);
+						if (value != null) {
+							Object copy = fieldSerializers[i].copy(value);
+							fields[i].set(target, copy);
+						} else {
+							fields[i].set(target, null);
+						}
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -268,20 +271,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		if (actualType == clazz) {
 			try {
 				for (int i = 0; i < numFields; i++) {
-					Object value = fields[i].get(from);
-					if (value != null) {
-						Object reuseValue = fields[i].get(reuse);
-						Object copy;
-						if(reuseValue != null) {
-							copy = fieldSerializers[i].copy(value, reuseValue);
-						}
-						else {
-							copy = fieldSerializers[i].copy(value);
+					if (fields[i] != null) {
+						Object value = fields[i].get(from);
+						if (value != null) {
+							Object reuseValue = fields[i].get(reuse);
+							Object copy;
+							if (reuseValue != null) {
+								copy = fieldSerializers[i].copy(value, reuseValue);
+							} else {
+								copy = fieldSerializers[i].copy(value);
+							}
+							fields[i].set(reuse, copy);
+						} else {
+							fields[i].set(reuse, null);
 						}
-						fields[i].set(reuse, copy);
-					}
-					else {
-						fields[i].set(reuse, null);
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -342,7 +345,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		if ((flags & NO_SUBCLASS) != 0) {
 			try {
 				for (int i = 0; i < numFields; i++) {
-					Object o = fields[i].get(value);
+					Object o = (fields[i] != null) ? fields[i].get(value) : null;
 					if (o == null) {
 						target.writeBoolean(true); // null field handling
 					} else {
@@ -400,11 +403,17 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			try {
 				for (int i = 0; i < numFields; i++) {
 					boolean isNull = source.readBoolean();
-					if (isNull) {
-						fields[i].set(target, null);
-					} else {
-						Object field = fieldSerializers[i].deserialize(source);
-						fields[i].set(target, field);
+
+					if (fields[i] != null) {
+						if (isNull) {
+							fields[i].set(target, null);
+						} else {
+							Object field = fieldSerializers[i].deserialize(source);
+							fields[i].set(target, field);
+						}
+					} else if (!isNull) {
+						// read and dump a pre-existing field value
+						fieldSerializers[i].deserialize(source);
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -465,20 +474,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			try {
 				for (int i = 0; i < numFields; i++) {
 					boolean isNull = source.readBoolean();
-					if (isNull) {
-						fields[i].set(reuse, null);
-					} else {
-						Object field;
 
-						Object reuseField = fields[i].get(reuse);
-						if(reuseField != null) {
-							field = fieldSerializers[i].deserialize(reuseField, source);
-						}
-						else {
-							field = fieldSerializers[i].deserialize(source);
-						}
+					if (fields[i] != null) {
+						if (isNull) {
+							fields[i].set(reuse, null);
+						} else {
+							Object field;
 
-						fields[i].set(reuse, field);
+							Object reuseField = fields[i].get(reuse);
+							if (reuseField != null) {
+								field = fieldSerializers[i].deserialize(reuseField, source);
+							} else {
+								field = fieldSerializers[i].deserialize(source);
+							}
+
+							fields[i].set(reuse, field);
+						}
+					} else if (!isNull) {
+						// read and dump a pre-existing field value
+						fieldSerializers[i].deserialize(source);
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -1012,8 +1026,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	// --------------------------------------------------------------------------------------------
 
-	private void writeObject(ObjectOutputStream out)
-		throws IOException, ClassNotFoundException {
+	private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
 		out.defaultWriteObject();
 		out.writeInt(fields.length);
 		for (Field field: fields) {
@@ -1021,12 +1034,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		}
 	}
 
-	private void readObject(ObjectInputStream in)
-		throws IOException, ClassNotFoundException {
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
 		int numFields = in.readInt();
 		fields = new Field[numFields];
 		for (int i = 0; i < numFields; i++) {
+			// the deserialized Field may be null if the field no longer exists in the POJO;
+			// in this case, when de-/serializing and copying instances using this serializer
+			// instance, the missing fields will simply be skipped
 			fields[i] = FieldSerializer.deserializeField(in);
 		}
 
@@ -1128,7 +1143,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	private int findField(String fieldName) {
 		int foundIndex = 0;
 		for (Field field : fields) {
-			if (fieldName.equals(field.getName())) {
+			if (field != null && fieldName.equals(field.getName())) {
 				return foundIndex;
 			}