You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:02 UTC
[4/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades
for managed state
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 4712ed1..5459d53 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -18,29 +18,43 @@
package org.apache.flink.api.java.typeutils.runtime;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* A test for the {@link PojoSerializer}.
*/
@@ -191,6 +205,20 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
return true;
}
}
+
+ public static class SubTestUserClassA extends TestUserClass {
+ public int subDumm1;
+ public String subDumm2;
+
+ public SubTestUserClassA() {}
+ }
+
+ public static class SubTestUserClassB extends TestUserClass {
+ public Double subDumm1;
+ public float subDumm2;
+
+ public SubTestUserClassB() {}
+ }
/**
* This tests if the hashes returned by the pojo and tuple comparators are the same
@@ -240,4 +268,244 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
Assert.assertTrue("The hashing for tuples and pojos must be the same, so that they are mixable. Also for those with multiple key fields", multiPojoHash == multiTupleHash);
}
-}
+
+ // --------------------------------------------------------------------------------------------
+ // Configuration snapshotting & reconfiguring tests
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Verifies that reconfiguring with a config snapshot of a preceding POJO serializer
+ * with different POJO type will result in INCOMPATIBLE.
+ */
+ @Test
+ public void testReconfigureWithDifferentPojoType() throws Exception {
+ PojoSerializer<SubTestUserClassB> pojoSerializer1 = (PojoSerializer<SubTestUserClassB>)
+ TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new ExecutionConfig());
+
+ // snapshot configuration and serialize to bytes
+ TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ PojoSerializer<SubTestUserClassA> pojoSerializer2 = (PojoSerializer<SubTestUserClassA>)
+ TypeExtractor.getForClass(SubTestUserClassA.class).createSerializer(new ExecutionConfig());
+
+ // read configuration again from bytes
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
+ assertTrue(compatResult.requiresMigration());
+ }
+
+ /**
+ * Tests that reconfiguration correctly reorders subclass registrations to their previous order.
+ */
+ @Test
+ public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.registerPojoType(SubTestUserClassA.class);
+ executionConfig.registerPojoType(SubTestUserClassB.class);
+
+ PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(executionConfig);
+
+ // get original registration ids
+ int subClassATag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class);
+ int subClassBTag = pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class);
+
+ // snapshot configuration and serialize to bytes
+ TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ // use new config and instantiate new PojoSerializer
+ executionConfig = new ExecutionConfig();
+ executionConfig.registerPojoType(SubTestUserClassB.class); // test with B registered before A
+ executionConfig.registerPojoType(SubTestUserClassA.class);
+
+ pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(executionConfig);
+
+ // read configuration from bytes
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+ assertTrue(!compatResult.requiresMigration());
+
+ // reconfigure - check reconfiguration result and that registration ids remains the same
+ //assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
+ assertEquals(subClassATag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue());
+ assertEquals(subClassBTag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue());
+ }
+
+ /**
+ * Tests that reconfiguration repopulates previously cached subclass serializers.
+ */
+ @Test
+ public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() throws Exception {
+ // don't register any subclasses
+ PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+ // create cached serializers for SubTestUserClassA and SubTestUserClassB
+ pojoSerializer.getSubclassSerializer(SubTestUserClassA.class);
+ pojoSerializer.getSubclassSerializer(SubTestUserClassB.class);
+
+ assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+
+ // snapshot configuration and serialize to bytes
+ TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ // instantiate new PojoSerializer
+
+ pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+ // read configuration from bytes
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ // reconfigure - check reconfiguration result and that subclass serializer cache is repopulated
+ CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+ assertFalse(compatResult.requiresMigration());
+ assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+ }
+
+ /**
+ * Tests that:
+ * - Previous Pojo serializer did not have registrations, and created cached serializers for subclasses
+ * - On restore, it had those subclasses registered
+ *
+ * In this case, after reconfiguration, the cache should be repopulated, and registrations should
+ * also exist for the subclasses.
+ *
+ * Note: the cache still needs to be repopulated because previous data of those subclasses were
+ * written with the cached serializers. In this case, the repopulated cache has reconfigured serializers
+ * for the subclasses so that previous written data can be read, but the registered serializers
+ * for the subclasses do not necessarily need to be reconfigured since they will only be used to
+ * write new data.
+ */
+ @Test
+ public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Exception {
+ // don't register any subclasses at first
+ PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+ // create cached serializers for SubTestUserClassA and SubTestUserClassB
+ pojoSerializer.getSubclassSerializer(SubTestUserClassA.class);
+ pojoSerializer.getSubclassSerializer(SubTestUserClassB.class);
+
+ // make sure serializers are in cache
+ assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+
+ // make sure that registrations are empty
+ assertTrue(pojoSerializer.getRegisteredClasses().isEmpty());
+ assertEquals(0, pojoSerializer.getRegisteredSerializers().length);
+
+ // snapshot configuration and serialize to bytes
+ TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ // instantiate new PojoSerializer, with new execution config that has the subclass registrations
+ ExecutionConfig newExecutionConfig = new ExecutionConfig();
+ newExecutionConfig.registerPojoType(SubTestUserClassA.class);
+ newExecutionConfig.registerPojoType(SubTestUserClassB.class);
+ pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(newExecutionConfig);
+
+ // read configuration from bytes
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ // reconfigure - check reconfiguration result and that
+ // 1) subclass serializer cache is repopulated
+ // 2) registrations also contain the now registered subclasses
+ CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
+ assertFalse(compatResult.requiresMigration());
+ assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
+ assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
+ assertEquals(2, pojoSerializer.getRegisteredClasses().size());
+ assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class));
+ assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class));
+ }
+
+ /**
+ * Verifies that reconfiguration reorders the fields of the new Pojo serializer to remain the same.
+ */
+ @Test
+ public void testReconfigureWithDifferentFieldOrder() throws Exception {
+ Field[] mockOriginalFieldOrder = {
+ TestUserClass.class.getField("dumm4"),
+ TestUserClass.class.getField("dumm3"),
+ TestUserClass.class.getField("nestedClass"),
+ TestUserClass.class.getField("dumm1"),
+ TestUserClass.class.getField("dumm2"),
+ TestUserClass.class.getField("dumm5"),
+ };
+
+ // creating this serializer just for generating config snapshots of the field serializers
+ PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+ LinkedHashMap<Field, TypeSerializerConfigSnapshot> mockOriginalFieldToSerializerConfigSnapshot =
+ new LinkedHashMap<>(mockOriginalFieldOrder.length);
+ mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], ser.getFieldSerializers()[3].snapshotConfiguration());
+ mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], ser.getFieldSerializers()[2].snapshotConfiguration());
+ mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], ser.getFieldSerializers()[5].snapshotConfiguration());
+ mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], ser.getFieldSerializers()[0].snapshotConfiguration());
+ mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], ser.getFieldSerializers()[1].snapshotConfiguration());
+ mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], ser.getFieldSerializers()[4].snapshotConfiguration());
+
+ PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
+
+ assertEquals(TestUserClass.class.getField("dumm1"), pojoSerializer.getFields()[0]);
+ assertEquals(TestUserClass.class.getField("dumm2"), pojoSerializer.getFields()[1]);
+ assertEquals(TestUserClass.class.getField("dumm3"), pojoSerializer.getFields()[2]);
+ assertEquals(TestUserClass.class.getField("dumm4"), pojoSerializer.getFields()[3]);
+ assertEquals(TestUserClass.class.getField("dumm5"), pojoSerializer.getFields()[4]);
+ assertEquals(TestUserClass.class.getField("nestedClass"), pojoSerializer.getFields()[5]);
+
+ PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> mockPreviousConfigSnapshot =
+ new PojoSerializer.PojoSerializerConfigSnapshot<>(
+ TestUserClass.class,
+ mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order
+ new LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test
+ new HashMap<Class<?>, TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test
+
+ // reconfigure - check reconfiguration result and that fields are reordered to the previous order
+ CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(
+
+ mockPreviousConfigSnapshot);
+ assertFalse(compatResult.requiresMigration());
+ int i = 0;
+ for (Field field : mockOriginalFieldOrder) {
+ assertEquals(field, pojoSerializer.getFields()[i]);
+ i++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
new file mode 100644
index 0000000..60c4dc4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests related to configuration snapshotting and reconfiguring for the {@link KryoSerializer}.
+ */
+public class KryoSerializerCompatibilityTest {
+
+ /**
+ * Verifies that reconfiguration result is INCOMPATIBLE if data type has changed.
+ */
+ @Test
+ public void testMigrationStrategyWithDifferentKryoType() throws Exception {
+ KryoSerializer<TestClassA> kryoSerializerForA = new KryoSerializer<>(TestClassA.class, new ExecutionConfig());
+
+ // snapshot configuration and serialize to bytes
+ TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig());
+
+ // read configuration again from bytes
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
+ assertTrue(compatResult.requiresMigration());
+ }
+
+ /**
+ * Tests that after reconfiguration, registration ids are reconfigured to
+ * remain the same as the preceding KryoSerializer.
+ */
+ @Test
+ public void testMigrationStrategyForDifferentRegistrationOrder() throws Exception {
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.registerKryoType(TestClassA.class);
+ executionConfig.registerKryoType(TestClassB.class);
+
+ KryoSerializer<TestClass> kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig);
+
+ // get original registration ids
+ int testClassId = kryoSerializer.getKryo().getRegistration(TestClass.class).getId();
+ int testClassAId = kryoSerializer.getKryo().getRegistration(TestClassA.class).getId();
+ int testClassBId = kryoSerializer.getKryo().getRegistration(TestClassB.class).getId();
+
+ // snapshot configuration and serialize to bytes
+ TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration();
+ byte[] serializedConfig;
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
+ serializedConfig = out.toByteArray();
+ }
+
+ // use new config and instantiate new KryoSerializer
+ executionConfig = new ExecutionConfig();
+ executionConfig.registerKryoType(TestClassB.class); // test with B registered before A
+ executionConfig.registerKryoType(TestClassA.class);
+
+ kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig);
+
+ // read configuration from bytes
+ try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+
+ // reconfigure - check reconfiguration result and that registration id remains the same
+ CompatibilityResult<TestClass> compatResult = kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
+ assertFalse(compatResult.requiresMigration());
+ assertEquals(testClassId, kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
+ assertEquals(testClassAId, kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
+ assertEquals(testClassBId, kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());
+ }
+
+ private static class TestClass {}
+
+ private static class TestClassA {}
+
+ private static class TestClassB {}
+
+ private static class TestClassBSerializer extends Serializer {
+ @Override
+ public void write(Kryo kryo, Output output, Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object read(Kryo kryo, Input input, Class aClass) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 3bb40eb..4dabaca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -26,7 +26,9 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.memory.DataInputView;
@@ -386,5 +388,15 @@ public class CollectionInputFormatTest {
public int hashCode() {
return Objects.hash(failOnRead, failOnWrite);
}
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompatibilityResult<ElementType> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index 846b6c3..9e22fc2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -18,7 +18,10 @@
package org.apache.flink.cep;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -39,7 +42,8 @@ import java.util.IdentityHashMap;
*
* @param <T> Type of the element to be serialized
*/
-public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
+@Internal
+public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = -7633631762221447524L;
// underlying type serializer
@@ -192,4 +196,14 @@ public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
this.identityMap = new IdentityHashMap<>();
this.elementList = new ArrayList<>();
}
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ }
+
+ @Override
+ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index b8c4e65..70755e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,6 +21,22 @@ package org.apache.flink.cep.nfa;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -43,20 +59,6 @@ import java.util.Set;
import java.util.Stack;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.cep.NonDuplicatingTypeSerializer;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
/**
* Non-deterministic finite automaton implementation.
@@ -859,7 +861,7 @@ public class NFA<T> implements Serializable {
/**
* A {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
*/
- public static class Serializer<T> extends TypeSerializer<NFA<T>> {
+ public static class Serializer<T> extends TypeSerializerSingleton<NFA<T>> {
private static final long serialVersionUID = 1L;
@@ -869,11 +871,6 @@ public class NFA<T> implements Serializable {
}
@Override
- public TypeSerializer<NFA<T>> duplicate() {
- return this;
- }
-
- @Override
public NFA<T> createInstance() {
return null;
}
@@ -944,18 +941,8 @@ public class NFA<T> implements Serializable {
}
@Override
- public boolean equals(Object obj) {
- return obj instanceof Serializer && ((Serializer) obj).canEqual(this);
- }
-
- @Override
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b6374cd..14235dc 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -21,7 +21,10 @@ package org.apache.flink.cep.operator;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -473,11 +476,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
}
@Override
- public boolean canRestoreFrom(TypeSerializer<?> other) {
- return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
- }
-
- @Override
public boolean equals(Object obj) {
if (obj instanceof PriorityQueueSerializer) {
@SuppressWarnings("unchecked")
@@ -498,6 +496,32 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
public int hashCode() {
return Objects.hash(factory, elementSerializer);
}
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+ }
+
+ @Override
+ public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
+ CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
+ ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+ if (!compatResult.requiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (compatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory));
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
}
private interface PriorityQueueFactory<T> extends Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
index b86fe87..5984122 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.types.valuearray;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -82,4 +83,10 @@ public final class IntValueArraySerializer extends TypeSerializerSingleton<IntVa
public boolean canEqual(Object obj) {
return obj instanceof IntValueArraySerializer;
}
+
+ @Override
+ protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+ return super.isCompatibleSerializationFormatIdentifier(identifier)
+ || identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
index 95219b6..e95a1a7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.types.valuearray;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -82,4 +83,10 @@ public final class LongValueArraySerializer extends TypeSerializerSingleton<Long
public boolean canEqual(Object obj) {
return obj instanceof LongValueArraySerializer;
}
+
+ @Override
+ protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+ return super.isCompatibleSerializationFormatIdentifier(identifier)
+ || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
index 0e875e3..6dbe0e5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.types.valuearray;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -82,4 +83,10 @@ public final class StringValueArraySerializer extends TypeSerializerSingleton<St
public boolean canEqual(Object obj) {
return obj instanceof StringValueArraySerializer;
}
+
+ @Override
+ protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+ return super.isCompatibleSerializationFormatIdentifier(identifier)
+ || identifier.equals(StringArraySerializer.class.getCanonicalName());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 1f56a98..7ffa57c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.runtime.types
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.types.Row
@@ -75,4 +75,55 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
}
override def hashCode: Int = rowSerializer.hashCode() * 13
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+ new CRowSerializer.CRowSerializerConfigSnapshot(
+ rowSerializer.snapshotConfiguration())
+ }
+
+ override def ensureCompatibility(
+ configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[CRow] = {
+
+ configSnapshot match {
+ case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot =>
+ val compatResult = rowSerializer.ensureCompatibility(
+ crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+
+ if (compatResult.requiresMigration()) {
+ if (compatResult.getConvertDeserializer != null) {
+ CompatibilityResult.requiresMigration(
+ new CRowSerializer(compatResult.getConvertDeserializer)
+ )
+ } else {
+ CompatibilityResult.requiresMigration(null)
+ }
+ } else {
+ CompatibilityResult.compatible()
+ }
+
+ case _ => CompatibilityResult.requiresMigration(null)
+ }
+ }
+}
+
+object CRowSerializer {
+
+ class CRowSerializerConfigSnapshot(
+ private var rowSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
+ extends CompositeTypeSerializerConfigSnapshot(rowSerializerConfigSnapshot) {
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ def this() = this(null)
+
+ override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
+ }
+
+ object CRowSerializerConfigSnapshot {
+ val VERSION = 1
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
index c6813b6..c4e23ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
@@ -18,7 +18,10 @@
package org.apache.flink.migration;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -51,8 +54,7 @@ public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializab
@Override
public TypeSerializer<Serializable> duplicate() {
- throw new UnsupportedOperationException(
- "This is just a proxy used during migration until the real type serializer is provided by the user.");
+ return this;
}
@Override
@@ -103,6 +105,17 @@ public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializab
}
@Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new ParameterlessTypeSerializerConfig(getClass().getCanonicalName());
+ }
+
+ @Override
+ public CompatibilityResult<Serializable> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ // always assume compatibility since we're just a proxy for migration
+ return CompatibilityResult.compatible();
+ }
+
+ @Override
public boolean equals(Object obj) {
return obj instanceof MigrationNamespaceSerializerProxy;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
index f47989a..f58070e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMigrationRestoreStrategy.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
@@ -85,14 +85,14 @@ public abstract class AbstractMigrationRestoreStrategy<K, N, S> implements Migra
patchedNamespaceSerializer = (TypeSerializer<N>) VoidNamespaceSerializer.INSTANCE;
}
- RegisteredBackendStateMetaInfo<N, S> registeredBackendStateMetaInfo =
- new RegisteredBackendStateMetaInfo<>(
+ RegisteredKeyedBackendStateMetaInfo<N, S> registeredKeyedBackendStateMetaInfo =
+ new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.UNKNOWN,
stateName,
patchedNamespaceSerializer,
stateSerializer);
- final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredBackendStateMetaInfo);
+ final StateTable<K, N, S> stateTable = stateBackend.newStateTable(registeredKeyedBackendStateMetaInfo);
final DataInputView inView = openDataInputView();
final int keyGroup = keyGroupRange.getStartKeyGroup();
final int numNamespaces = inView.readInt();
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 0badb41..8fbc227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -17,7 +17,10 @@
*/
package org.apache.flink.runtime.state;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -132,4 +135,30 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
public int hashCode() {
return elementSerializer.hashCode();
}
-}
\ No newline at end of file
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+ }
+
+ @Override
+ public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
+ CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
+ ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+ if (!compatResult.requiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (compatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new ArrayListSerializer<>(compatResult.getConvertDeserializer()));
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index e7ed26f..ec4aa81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -207,26 +207,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(registeredStatesDeepCopies.size());
- List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> metaInfoSnapshots =
new ArrayList<>(registeredStatesDeepCopies.size());
- for (Map.Entry<String, PartitionableListState<?>> entry :
- registeredStatesDeepCopies.entrySet()) {
-
- PartitionableListState<?> state = entry.getValue();
- OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
- new OperatorBackendSerializationProxy.StateMetaInfo<>(
- state.getName(),
- state.getPartitionStateSerializer(),
- state.getAssignmentMode());
- metaInfoList.add(metaInfo);
+ for (Map.Entry<String, PartitionableListState<?>> entry : registeredStatesDeepCopies.entrySet()) {
+ metaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
DataOutputView dov = new DataOutputViewStreamWrapper(out);
OperatorBackendSerializationProxy backendSerializationProxy =
- new OperatorBackendSerializationProxy(metaInfoList);
+ new OperatorBackendSerializationProxy(metaInfoSnapshots);
backendSerializationProxy.write(dov);
@@ -237,7 +229,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(out);
- OperatorStateHandle.Mode mode = value.getAssignmentMode();
+ OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
@@ -254,10 +246,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return null;
}
- OperatorStateHandle operatorStateHandle =
- new OperatorStateHandle(writtenStatesMetaData, stateHandle);
-
- return operatorStateHandle;
+ return new OperatorStateHandle(writtenStatesMetaData, stateHandle);
}
};
@@ -298,25 +287,23 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
- List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
- backendSerializationProxy.getNamedStateSerializationProxies();
+ List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredMetaInfoSnapshots =
+ backendSerializationProxy.getStateMetaInfoSnapshots();
// Recreate all PartitionableListStates from the meta info
- for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) {
- PartitionableListState<?> listState = registeredStates.get(stateMetaInfo.getName());
+ for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) {
+ PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName());
if (null == listState) {
listState = new PartitionableListState<>(
- stateMetaInfo.getName(),
- stateMetaInfo.getStateSerializer(),
- stateMetaInfo.getMode());
+ new RegisteredOperatorBackendStateMetaInfo<>(
+ restoredMetaInfo.getName(),
+ restoredMetaInfo.getPartitionStateSerializer(),
+ restoredMetaInfo.getAssignmentMode()));
- registeredStates.put(listState.getName(), listState);
+ registeredStates.put(listState.getStateMetaInfo().getName(), listState);
} else {
- Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom(
- stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
- listState.getPartitionStateSerializer() + " is not compatible with " +
- stateMetaInfo.getStateSerializer());
+ // TODO with eager state registration in place, check here for serializer migration strategies
}
}
@@ -341,7 +328,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
/**
- *
* Implementation of operator list state.
*
* @param <S> the type of an operator state partition.
@@ -349,19 +335,9 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
static final class PartitionableListState<S> implements ListState<S> {
/**
- * The name of the state, as registered by the user
- */
- private final String name;
-
- /**
- * The type serializer for the elements in the state list
- */
- private final TypeSerializer<S> partitionStateSerializer;
-
- /**
- * The mode how elements in this state are assigned to tasks during restore
+ * Meta information of the state, including state name, assignment mode, and serializer
*/
- private final OperatorStateHandle.Mode assignmentMode;
+ private final RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
/**
* The internal list the holds the elements of the state
@@ -373,46 +349,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
*/
private final ArrayListSerializer<S> internalListCopySerializer;
- public PartitionableListState(
- String name,
- TypeSerializer<S> partitionStateSerializer,
- OperatorStateHandle.Mode assignmentMode) {
-
- this(name, partitionStateSerializer, assignmentMode, new ArrayList<S>());
+ public PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+ this(stateMetaInfo, new ArrayList<S>());
}
private PartitionableListState(
- String name,
- TypeSerializer<S> partitionStateSerializer,
- OperatorStateHandle.Mode assignmentMode,
+ RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
ArrayList<S> internalList) {
- this.name = Preconditions.checkNotNull(name);
- this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
- this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
+ this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
- this.internalListCopySerializer = new ArrayListSerializer<>(partitionStateSerializer);
+ this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
}
private PartitionableListState(PartitionableListState<S> toCopy) {
- this(
- toCopy.name,
- toCopy.partitionStateSerializer.duplicate(),
- toCopy.assignmentMode,
- toCopy.internalListCopySerializer.copy(toCopy.internalList));
- }
-
- public String getName() {
- return name;
- }
-
- public OperatorStateHandle.Mode getAssignmentMode() {
- return assignmentMode;
+ this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
}
- public TypeSerializer<S> getPartitionStateSerializer() {
- return partitionStateSerializer;
+ public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+ return stateMetaInfo;
}
public List<S> getInternalList() {
@@ -441,8 +397,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
@Override
public String toString() {
return "PartitionableListState{" +
- "name='" + name + '\'' +
- ", assignmentMode=" + assignmentMode +
+ "stateMetaInfo=" + stateMetaInfo +
", internalList=" + internalList +
'}';
}
@@ -456,7 +411,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
for (int i = 0; i < internalList.size(); ++i) {
S element = internalList.get(i);
partitionOffsets[i] = out.getPos();
- partitionStateSerializer.serialize(element, dov);
+ getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
}
return partitionOffsets;
@@ -466,7 +421,6 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
private <S> ListState<S> getListState(
ListStateDescriptor<S> stateDescriptor,
OperatorStateHandle.Mode mode) throws IOException {
-
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
@@ -478,23 +432,27 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
if (null == partitionableListState) {
-
partitionableListState = new PartitionableListState<>(
- name,
- partitionStateSerializer,
- mode);
+ new RegisteredOperatorBackendStateMetaInfo<>(
+ name,
+ partitionStateSerializer,
+ mode));
registeredStates.put(name, partitionableListState);
} else {
+ // TODO with eager registration in place, these checks should be moved to restore()
+
Preconditions.checkState(
- partitionableListState.getAssignmentMode().equals(mode),
- "Incompatible assignment mode. Provided: " + mode + ", expected: " +
- partitionableListState.getAssignmentMode());
+ partitionableListState.getStateMetaInfo().getName().equals(name),
+ "Incompatible state names. " +
+ "Was [" + partitionableListState.getStateMetaInfo().getName() + "], " +
+ "registered with [" + name + "].");
+
Preconditions.checkState(
- stateDescriptor.getElementSerializer().
- canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
- "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() +
- ", found: " + partitionableListState.getPartitionStateSerializer());
+ partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode),
+ "Incompatible state assignment modes. " +
+ "Was [" + partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " +
+ "registered with [" + mode + "].");
}
return partitionableListState;
@@ -509,7 +467,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
long[] offsets = metaInfo.getOffsets();
if (null != offsets) {
DataInputView div = new DataInputViewStreamWrapper(in);
- TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer();
+ TypeSerializer<S> serializer = stateListForName.getStateMetaInfo().getPartitionStateSerializer();
for (long offset : offsets) {
in.seek(offset);
stateListForName.add(serializer.deserialize(div));
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index 61cc58c..d52c207 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -18,7 +18,11 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
@@ -38,7 +42,8 @@ import java.util.Map;
* @param <K> The type of the keys in the map.
* @param <V> The type of the values in the map.
*/
-public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
+@Internal
+public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
private static final long serialVersionUID = -6885593032367050078L;
@@ -190,4 +195,37 @@ public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
public int hashCode() {
return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
}
+
+ // --------------------------------------------------------------------------------------------
+ // Serializer configuration snapshotting & compatibility
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return new MapSerializerConfigSnapshot(
+ keySerializer.snapshotConfiguration(),
+ valueSerializer.snapshotConfiguration());
+ }
+
+ @Override
+ public CompatibilityResult<HashMap<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ if (configSnapshot instanceof MapSerializerConfigSnapshot) {
+ TypeSerializerConfigSnapshot[] keyValueSerializerConfigSnapshots =
+ ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots();
+
+ CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
+ CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+
+ if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) {
+ return CompatibilityResult.compatible();
+ } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
+ return CompatibilityResult.requiresMigration(
+ new HashMapSerializer<>(
+ keyCompatResult.getConvertDeserializer(),
+ valueCompatResult.getConvertDeserializer()));
+ }
+ }
+
+ return CompatibilityResult.requiresMigration(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 512baf6..d49b1d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView;
@@ -31,7 +31,7 @@ import java.io.Serializable;
@SuppressWarnings("serial")
@Internal
-final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+final class JavaSerializer<T extends Serializable> extends TypeSerializerSingleton<T> {
private static final long serialVersionUID = 5067491650263321234L;
@@ -41,11 +41,6 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
}
@Override
- public TypeSerializer<T> duplicate() {
- return this;
- }
-
- @Override
public T createInstance() {
return null;
}
@@ -98,17 +93,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
}
@Override
- public boolean equals(Object obj) {
- return obj instanceof JavaSerializer;
- }
-
- @Override
public boolean canEqual(Object obj) {
return obj instanceof JavaSerializer;
}
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 5661c38..a389c4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.io.VersionMismatchException;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
@@ -33,15 +31,15 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state
+ * Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state
* serialization logic here.
*/
public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
- public static final int VERSION = 2;
+ public static final int VERSION = 3;
- private TypeSerializerSerializationProxy<?> keySerializerProxy;
- private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+ private TypeSerializer<?> keySerializer;
+ private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
private int restoredVersion;
private ClassLoader userCodeClassLoader;
@@ -50,19 +48,25 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
}
- public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
- this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
- this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ public KeyedBackendSerializationProxy(
+ TypeSerializer<?> keySerializer,
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
+
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+
+ Preconditions.checkNotNull(stateMetaInfoSnapshots);
+ Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
+ this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+
this.restoredVersion = VERSION;
- Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
}
- public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
- return namedStateSerializationProxies;
+ public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> getStateMetaInfoSnapshots() {
+ return stateMetaInfoSnapshots;
}
- public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
- return keySerializerProxy;
+ public TypeSerializer<?> getKeySerializer() {
+ return keySerializer;
}
@Override
@@ -82,20 +86,22 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
@Override
public boolean isCompatibleVersion(int version) {
- // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
- return super.isCompatibleVersion(version) || version == 1;
+ // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
+ return super.isCompatibleVersion(version) || version == 2 || version == 1;
}
@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
- keySerializerProxy.write(out);
+ new TypeSerializerSerializationProxy<>(keySerializer).write(out);
- out.writeShort(namedStateSerializationProxies.size());
+ out.writeShort(stateMetaInfoSnapshots.size());
- for (StateMetaInfo<?, ?> kvState : namedStateSerializationProxies) {
- kvState.write(out);
+ for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) {
+ KeyedBackendStateMetaInfoSnapshotReaderWriters
+ .getWriterForVersion(VERSION, metaInfo)
+ .writeStateMetaInfo(out);
}
}
@@ -103,132 +109,18 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
public void read(DataInputView in) throws IOException {
super.read(in);
- keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ final TypeSerializerSerializationProxy<?> keySerializerProxy =
+ new TypeSerializerSerializationProxy<>(userCodeClassLoader);
keySerializerProxy.read(in);
+ this.keySerializer = keySerializerProxy.getTypeSerializer();
int numKvStates = in.readShort();
- namedStateSerializationProxies = new ArrayList<>(numKvStates);
- for (int i = 0; i < numKvStates; ++i) {
- StateMetaInfo<?, ?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
- stateSerializationProxy.read(in);
- namedStateSerializationProxies.add(stateSerializationProxy);
- }
- }
-
- //----------------------------------------------------------------------------------------------------------------------
-
- /**
- * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
- * keyed backend.
- */
- public static class StateMetaInfo<N, S> implements IOReadableWritable {
-
- private StateDescriptor.Type stateType;
- private String stateName;
- private TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy;
- private TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy;
-
- private ClassLoader userClassLoader;
-
- StateMetaInfo(ClassLoader userClassLoader) {
- this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
- }
-
- public StateMetaInfo(
- StateDescriptor.Type stateType,
- String name,
- TypeSerializer<N> namespaceSerializer,
- TypeSerializer<S> stateSerializer) {
-
- this.stateType = Preconditions.checkNotNull(stateType);
- this.stateName = Preconditions.checkNotNull(name);
- this.namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer));
- this.stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer));
- }
-
- public StateDescriptor.Type getStateType() {
- return stateType;
- }
-
- public void setStateType(StateDescriptor.Type stateType) {
- this.stateType = stateType;
- }
-
- public String getStateName() {
- return stateName;
- }
-
- public void setStateName(String stateName) {
- this.stateName = stateName;
- }
-
- public TypeSerializerSerializationProxy<N> getNamespaceSerializerSerializationProxy() {
- return namespaceSerializerSerializationProxy;
- }
-
- public void setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy) {
- this.namespaceSerializerSerializationProxy = namespaceSerializerSerializationProxy;
- }
-
- public TypeSerializerSerializationProxy<S> getStateSerializerSerializationProxy() {
- return stateSerializerSerializationProxy;
- }
-
- public void setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy) {
- this.stateSerializerSerializationProxy = stateSerializerSerializationProxy;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeInt(getStateType().ordinal());
- out.writeUTF(getStateName());
-
- getNamespaceSerializerSerializationProxy().write(out);
- getStateSerializerSerializationProxy().write(out);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- int enumOrdinal = in.readInt();
- setStateType(StateDescriptor.Type.values()[enumOrdinal]);
- setStateName(in.readUTF());
-
- namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
- namespaceSerializerSerializationProxy.read(in);
-
- stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
- stateSerializerSerializationProxy.read(in);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o;
-
- if (!getStateName().equals(that.getStateName())) {
- return false;
- }
-
- if (!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy())) {
- return false;
- }
-
- return getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy());
- }
-
- @Override
- public int hashCode() {
- int result = getStateName().hashCode();
- result = 31 * result + getNamespaceSerializerSerializationProxy().hashCode();
- result = 31 * result + getStateSerializerSerializationProxy().hashCode();
- return result;
+ stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+ for (int i = 0; i < numKvStates; i++) {
+ stateMetaInfoSnapshots.add(
+ KeyedBackendStateMetaInfoSnapshotReaderWriters
+ .getReaderForVersion(restoredVersion, userCodeClassLoader)
+ .readStateMetaInfo(in));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
new file mode 100644
index 0000000..83aa335
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Readers and writers for different versions of the {@link RegisteredKeyedBackendStateMetaInfo.Snapshot}.
+ * Outdated formats are also kept here for documentation of history backlog.
+ */
+public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
+
+ // -------------------------------------------------------------------------------
+ // Writers
+ // - v1: Flink 1.2.x
+ // - v2: Flink 1.3.x
+ // -------------------------------------------------------------------------------
+
+ public static <N, S> KeyedBackendStateMetaInfoWriter getWriterForVersion(
+ int version, RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+
+ switch (version) {
+ case 1:
+ case 2:
+ return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
+
+ // current version
+ case KeyedBackendSerializationProxy.VERSION:
+ return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
+
+ default:
+ // guard for future
+ throw new IllegalStateException(
+ "Unrecognized keyed backend state meta info writer version: " + version);
+ }
+ }
+
+ public interface KeyedBackendStateMetaInfoWriter {
+ void writeStateMetaInfo(DataOutputView out) throws IOException;
+ }
+
+ static abstract class AbstractKeyedBackendStateMetaInfoWriter<N, S> implements KeyedBackendStateMetaInfoWriter {
+
+ protected final RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo;
+
+ public AbstractKeyedBackendStateMetaInfoWriter(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+ this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
+ }
+
+ }
+
+ static class KeyedBackendStateMetaInfoWriterV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> {
+
+ public KeyedBackendStateMetaInfoWriterV1V2(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+ super(stateMetaInfo);
+ }
+
+ @Override
+ public void writeStateMetaInfo(DataOutputView out) throws IOException {
+ out.writeInt(stateMetaInfo.getStateType().ordinal());
+ out.writeUTF(stateMetaInfo.getName());
+
+ new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(out);
+ new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(out);
+ }
+ }
+
+ static class KeyedBackendStateMetaInfoWriterV3<N, S> extends AbstractKeyedBackendStateMetaInfoWriter<N, S> {
+
+ public KeyedBackendStateMetaInfoWriterV3(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> stateMetaInfo) {
+ super(stateMetaInfo);
+ }
+
+ @Override
+ public void writeStateMetaInfo(DataOutputView out) throws IOException {
+ out.writeInt(stateMetaInfo.getStateType().ordinal());
+ out.writeUTF(stateMetaInfo.getName());
+
+ // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
+ try (
+ ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos();
+ DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) {
+
+ new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(outViewWrapper);
+
+ // write current offset, which represents the start offset of the state serializer
+ out.writeInt(outWithPos.getPosition());
+ new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(outViewWrapper);
+
+ // write current offset, which represents the start of the configuration snapshots
+ out.writeInt(outWithPos.getPosition());
+ TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getNamespaceSerializerConfigSnapshot());
+ TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getStateSerializerConfigSnapshot());
+
+ // write total number of bytes and then flush
+ out.writeInt(outWithPos.getPosition());
+ out.write(outWithPos.getBuf(), 0, outWithPos.getPosition());
+ }
+ }
+ }
+
+
+ // -------------------------------------------------------------------------------
+ // Readers
+ // - v1: Flink 1.2.x
+ // - v2: Flink 1.3.x
+ // -------------------------------------------------------------------------------
+
+ public static KeyedBackendStateMetaInfoReader getReaderForVersion(
+ int version, ClassLoader userCodeClassLoader) {
+
+ switch (version) {
+ case 1:
+ case 2:
+ return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
+
+ // current version
+ case KeyedBackendSerializationProxy.VERSION:
+ return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
+
+ default:
+ // guard for future
+ throw new IllegalStateException(
+ "Unrecognized keyed backend state meta info reader version: " + version);
+ }
+ }
+
+ public interface KeyedBackendStateMetaInfoReader<N, S> {
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException;
+ }
+
+ static abstract class AbstractKeyedBackendStateMetaInfoReader implements KeyedBackendStateMetaInfoReader {
+
+ protected final ClassLoader userCodeClassLoader;
+
+ public AbstractKeyedBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ }
+
+ }
+
+ static class KeyedBackendStateMetaInfoReaderV1V2<N, S> extends AbstractKeyedBackendStateMetaInfoReader {
+
+ public KeyedBackendStateMetaInfoReaderV1V2(ClassLoader userCodeClassLoader) {
+ super(userCodeClassLoader);
+ }
+
+ @Override
+ public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException {
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo =
+ new RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
+
+ metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
+ metaInfo.setName(in.readUTF());
+
+ final TypeSerializerSerializationProxy<N> namespaceSerializerProxy =
+ new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ namespaceSerializerProxy.read(in);
+ metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
+
+ final TypeSerializerSerializationProxy<S> stateSerializerProxy =
+ new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ stateSerializerProxy.read(in);
+ metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+
+ // older versions do not contain the configuration snapshot
+ metaInfo.setNamespaceSerializerConfigSnapshot(null);
+ metaInfo.setStateSerializerConfigSnapshot(null);
+
+ return metaInfo;
+ }
+ }
+
+ static class KeyedBackendStateMetaInfoReaderV3<N, S> extends AbstractKeyedBackendStateMetaInfoReader {
+
+ public KeyedBackendStateMetaInfoReaderV3(ClassLoader userCodeClassLoader) {
+ super(userCodeClassLoader);
+ }
+
+ @Override
+ public RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> readStateMetaInfo(DataInputView in) throws IOException {
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> metaInfo =
+ new RegisteredKeyedBackendStateMetaInfo.Snapshot<>();
+
+ metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]);
+ metaInfo.setName(in.readUTF());
+
+ // read offsets
+ int stateSerializerStartOffset = in.readInt();
+ int configSnapshotsStartOffset = in.readInt();
+
+ int totalBytes = in.readInt();
+
+ byte[] buffer = new byte[totalBytes];
+ in.readFully(buffer);
+
+ ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer);
+ DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos);
+
+ try {
+ final TypeSerializerSerializationProxy<N> namespaceSerializerProxy =
+ new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ namespaceSerializerProxy.read(inViewWrapper);
+ metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer());
+ } catch (IOException e) {
+ metaInfo.setNamespaceSerializer(null);
+ }
+
+ // make sure we start from the state serializer bytes position
+ inWithPos.setPosition(stateSerializerStartOffset);
+ try {
+ final TypeSerializerSerializationProxy<S> stateSerializerProxy =
+ new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ stateSerializerProxy.read(inViewWrapper);
+ metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer());
+ } catch (IOException e) {
+ metaInfo.setStateSerializer(null);
+ }
+
+ // make sure we start from the config snapshot bytes position
+ inWithPos.setPosition(configSnapshotsStartOffset);
+ metaInfo.setNamespaceSerializerConfigSnapshot(
+ TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader));
+ metaInfo.setStateSerializerConfigSnapshot(
+ TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader));
+
+ return metaInfo;
+ }
+ }
+}