You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/08/08 08:00:49 UTC

[flink] branch release-1.9 updated (15b45e7 -> 5c09e1c)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 15b45e7  [FLINK-13225][hive] Add hive function it case using blink-planner
     new c26ddf8  [FLINK-13159] Fix the NPE when PojoSerializer restored
     new 155f8c7  [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer
     new 5c09e1c  [FLINK-13159] [tests] Add subclass serialization cases to PojoSerializerSnapshotMigrationTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java |   3 +-
 .../typeutils/runtime/PojoSerializerSnapshot.java  |  22 +++++++----
 .../PojoSerializerSnapshotMigrationTest.java       |  41 +++++++++++++++++++++
 ...nk-1.6-pojo-registered-subclass-serializer-data | Bin 0 -> 350 bytes
 ....6-pojo-registered-subclass-serializer-snapshot | Bin 0 -> 9236 bytes
 ...-1.6-pojo-unregistered-subclass-serializer-data | Bin 0 -> 1320 bytes
 ...-pojo-unregistered-subclass-serializer-snapshot | Bin 0 -> 8890 bytes
 ...nk-1.7-pojo-registered-subclass-serializer-data | Bin 0 -> 350 bytes
 ....7-pojo-registered-subclass-serializer-snapshot | Bin 0 -> 12183 bytes
 ...-1.7-pojo-unregistered-subclass-serializer-data | Bin 0 -> 1320 bytes
 ...-pojo-unregistered-subclass-serializer-snapshot | Bin 0 -> 11832 bytes
 11 files changed, 58 insertions(+), 8 deletions(-)
 create mode 100644 flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-snapshot


[flink] 03/03: [FLINK-13159] [tests] Add subclass serialization cases to PojoSerializerSnapshotMigrationTest

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c09e1c29897e7c098ee55c0d7881c5d2016d94c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Aug 7 11:43:02 2019 +0200

    [FLINK-13159] [tests] Add subclass serialization cases to PojoSerializerSnapshotMigrationTest
    
    This closes #9375.
---
 .../PojoSerializerSnapshotMigrationTest.java       |  41 +++++++++++++++++++++
 ...nk-1.6-pojo-registered-subclass-serializer-data | Bin 0 -> 350 bytes
 ....6-pojo-registered-subclass-serializer-snapshot | Bin 0 -> 9236 bytes
 ...-1.6-pojo-unregistered-subclass-serializer-data | Bin 0 -> 1320 bytes
 ...-pojo-unregistered-subclass-serializer-snapshot | Bin 0 -> 8890 bytes
 ...nk-1.7-pojo-registered-subclass-serializer-data | Bin 0 -> 350 bytes
 ....7-pojo-registered-subclass-serializer-snapshot | Bin 0 -> 12183 bytes
 ...-1.7-pojo-unregistered-subclass-serializer-data | Bin 0 -> 1320 bytes
 ...-pojo-unregistered-subclass-serializer-snapshot | Bin 0 -> 11832 bytes
 9 files changed, 41 insertions(+)

diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
index 6dfcb64..fa83f05 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
@@ -31,6 +31,7 @@ import org.junit.runners.Parameterized;
 import java.util.Collection;
 
 import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs;
+import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -84,6 +85,22 @@ public class PojoSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 		}
 	}
 
+	/**
+	 * Data test files of test specification that used this type
+	 * had the data written using {@code PojoSerializer}s generated
+	 * using the base class {@code TestPojo}.
+	 */
+	public static class TestPojoSubclass extends TestPojo {
+		public String githubId;
+
+		public TestPojoSubclass() {}
+
+		public TestPojoSubclass(int id, String name, int age, String githubId) {
+			super(id, name, age);
+			this.githubId = githubId;
+		}
+	}
+
 	public PojoSerializerSnapshotMigrationTest(TestSpecification<Object> testSpecification) {
 		super(testSpecification);
 	}
@@ -107,6 +124,19 @@ public class PojoSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 			PojoSerializerSnapshotMigrationTest::testPojoWithNewAndRemovedFieldsSerializerSupplier,
 			hasSameCompatibilityAs(TypeSerializerSchemaCompatibility.compatibleAfterMigration()));
 
+		testSpecifications.addWithCompatibilityMatcher(
+			"pojo-unregistered-subclass-serializer",
+			PojoSerializer.class,
+			PojoSerializerSnapshot.class,
+			PojoSerializerSnapshotMigrationTest::testPojoSerializerSupplier,
+			isCompatibleWithReconfiguredSerializer());
+
+		testSpecifications.add(
+			"pojo-registered-subclass-serializer",
+			PojoSerializer.class,
+			PojoSerializerSnapshot.class,
+			PojoSerializerSnapshotMigrationTest::testPojoSerializerWithSubclassRegisteredSupplier);
+
 		return testSpecifications.get();
 	}
 
@@ -125,4 +155,15 @@ public class PojoSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM
 		assertTrue(serializer instanceof PojoSerializer);
 		return serializer;
 	}
+
+	private static TypeSerializer<TestPojo> testPojoSerializerWithSubclassRegisteredSupplier() {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.registerPojoType(TestPojoSubclass.class);
+
+		TypeSerializer<TestPojo> serializer =
+			TypeExtractor.createTypeInfo(TestPojo.class).createSerializer(executionConfig);
+
+		assertTrue(serializer instanceof PojoSerializer);
+		return serializer;
+	}
 }
diff --git a/flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-data b/flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-data
new file mode 100644
index 0000000..ca0b187
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-snapshot
new file mode 100644
index 0000000..4bb2055
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-pojo-registered-subclass-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-data b/flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-data
new file mode 100644
index 0000000..3450dd4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-snapshot
new file mode 100644
index 0000000..9231146
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-pojo-unregistered-subclass-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-data b/flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-data
new file mode 100644
index 0000000..ca0b187
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-snapshot
new file mode 100644
index 0000000..ca6c054
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-pojo-registered-subclass-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-data b/flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-data
new file mode 100644
index 0000000..3450dd4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-snapshot
new file mode 100644
index 0000000..99a7482
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-pojo-unregistered-subclass-serializer-snapshot differ


[flink] 01/03: [FLINK-13159] Fix the NPE when PojoSerializer restored

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c26ddf831520a5b6034ccf57169eb70eb4f7c47f
Author: Yun Tang <my...@live.com>
AuthorDate: Wed Aug 7 02:07:33 2019 +0800

    [FLINK-13159] Fix the NPE when PojoSerializer restored
---
 .../org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index dcc0a72..5c43d1e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -123,7 +123,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			this.fields[i].setAccessible(true);
 		}
 
-		cl = Thread.currentThread().getContextClassLoader();
+		this.cl = Thread.currentThread().getContextClassLoader();
 
 		// We only want those classes that are not our own class and are actually sub-classes.
 		LinkedHashSet<Class<?>> registeredSubclasses =
@@ -156,6 +156,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		this.registeredSerializers = checkNotNull(registeredSerializers);
 		this.subclassSerializerCache = checkNotNull(subclassSerializerCache);
 		this.executionConfig = checkNotNull(executionConfig);
+		this.cl = Thread.currentThread().getContextClassLoader();
 	}
 	
 	@Override


[flink] 02/03: [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 155f8c7278de175a77f757d98301178b57bc421c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Aug 8 07:34:04 2019 +0200

    [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer
---
 .../typeutils/runtime/PojoSerializerSnapshot.java  | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
index 9987fae..95276d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
@@ -474,13 +474,21 @@ public class PojoSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
 		Iterator<TypeSerializer<?>> serializersForPreexistingRegistrations =
 			Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator();
 
-		for (Map.Entry<Class<?>, TypeSerializer<?>> registration : newSubclassRegistrations.entrySet()) {
-			// new registrations should simply be appended to the subclass serializer registry with their new serializers;
-			// preexisting registrations should use the compatibility-checked serializer
-			TypeSerializer<?> newRegistration = (reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey()))
-				? serializersForPreexistingRegistrations.next()
-				: registration.getValue();
-			reconfiguredSubclassSerializerRegistry.put(registration.getKey(), newRegistration);
+		// first, replace all restored serializers of subclasses that co-exist in
+		// the previous and new registrations, with the compatibility-checked serializers
+		for (Map.Entry<Class<?>, TypeSerializer<?>> oldRegistration : reconfiguredSubclassSerializerRegistry.entrySet()) {
+			if (newSubclassRegistrations.containsKey(oldRegistration.getKey())) {
+				oldRegistration.setValue(serializersForPreexistingRegistrations.next());
+			}
+		}
+
+		// then, for all new registration that did not exist before, append it to the registry simply with their
+		// new serializers
+		for (Map.Entry<Class<?>, TypeSerializer<?>> newRegistration : newSubclassRegistrations.entrySet()) {
+			TypeSerializer<?> oldRegistration = reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey());
+			if (oldRegistration == null) {
+				reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), newRegistration.getValue());
+			}
 		}
 
 		return decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);