You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:20 UTC

[01/15] flink git commit: [FLINK-6869] [scala] Specify serialVersionUID for all Scala serializers

Repository: flink
Updated Branches:
  refs/heads/master 6abbba248 -> 23c82e3cc


[FLINK-6869] [scala] Specify serialVersionUID for all Scala serializers

Previously, Scala serializers did not specify the serialVersionUID, and
therefore prohibited restore from previous Flink version snapshots
because the serializers' implementations changed.

The serialVersionUIDs added in this commit are identical to what they
were (as generated by Java) in Flink 1.2, so that we can at least
restore state that were written with the Scala serializers as of 1.2.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75ea808b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75ea808b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75ea808b

Branch: refs/heads/master
Commit: 75ea808bb163774898604a6374b8486e05520497
Parents: 6edb72d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Jun 8 15:29:45 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:26:48 2017 +0200

----------------------------------------------------------------------
 .../flink/api/scala/typeutils/CaseClassSerializer.scala   | 10 ++++++++++
 .../flink/api/scala/typeutils/EitherSerializer.scala      |  1 +
 .../flink/api/scala/typeutils/EnumValueSerializer.scala   |  1 +
 .../flink/api/scala/typeutils/OptionSerializer.scala      |  1 +
 .../flink/api/scala/typeutils/TraversableSerializer.scala |  1 +
 .../apache/flink/api/scala/typeutils/TrySerializer.scala  |  1 +
 .../apache/flink/api/scala/typeutils/UnitSerializer.scala |  1 +
 7 files changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 29b4952..c8222d6 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -28,6 +28,7 @@ import org.apache.flink.types.NullFieldException
  * our Java Tuples so we have to treat them differently.
  */
 @Internal
+@SerialVersionUID(7341356073446263475L)
 abstract class CaseClassSerializer[T <: Product](
     clazz: Class[T],
     scalaFieldSerializers: Array[TypeSerializer[_]])
@@ -80,6 +81,15 @@ abstract class CaseClassSerializer[T <: Product](
     createInstance(fields)
   }
 
+  override def createSerializerInstance(
+      tupleClass: Class[T],
+      fieldSerializers: Array[TypeSerializer[_]]): TupleSerializerBase[T] = {
+    this.getClass
+      .getConstructors()(0)
+      .newInstance(tupleClass, fieldSerializers)
+      .asInstanceOf[CaseClassSerializer[T]]
+  }
+
   def copy(from: T, reuse: T): T = {
     copy(from)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 1095aee..439e0c2 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -26,6 +26,7 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
  * Serializer for [[Either]].
  */
 @Internal
+@SerialVersionUID(9219995873023657525L)
 class EitherSerializer[A, B, T <: Either[A, B]](
     val leftSerializer: TypeSerializer[A],
     val rightSerializer: TypeSerializer[B])

http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index d549623..50526f5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -30,6 +30,7 @@ import org.apache.flink.util.{InstantiationUtil, Preconditions}
  * Serializer for [[Enumeration]] values.
  */
 @Internal
+@SerialVersionUID(-2403076635594572920L)
 class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[E#Value] {
 
   type T = E#Value

http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 810c91c..aa4a0ea 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
  * Serializer for [[Option]].
  */
 @Internal
+@SerialVersionUID(-8635243274072627338L)
 class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   extends TypeSerializer[Option[A]] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 5963987..b54193b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -29,6 +29,7 @@ import scala.collection.generic.CanBuildFrom
  * Serializer for Scala Collections.
  */
 @Internal
+@SerialVersionUID(7522917416391312410L)
 abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     var elementSerializer: TypeSerializer[E])
   extends TypeSerializer[T] with Cloneable {

http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index a88cce7..e128157 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -29,6 +29,7 @@ import scala.util.{Failure, Success, Try}
  * Serializer for [[scala.util.Try]].
  */
 @Internal
+@SerialVersionUID(-3052182891252564491L)
 class TrySerializer[A](
     private val elemSerializer: TypeSerializer[A],
     private val executionConfig: ExecutionConfig)

http://git-wip-us.apache.org/repos/asf/flink/blob/75ea808b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
index d80bc9b..32c44d2 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 @Internal
+@SerialVersionUID(5413377487955047394L)
 class UnitSerializer extends TypeSerializerSingleton[Unit] {
 
   def isImmutableType(): Boolean = true


[14/15] flink git commit: [FLINK-6803] [tests] Fully enable PojoSerializerUpgradeTests for all state backends

Posted by tz...@apache.org.
[FLINK-6803] [tests] Fully enable PojoSerializerUpgradeTests for all state backends

With the fixes for the PojoSerializer in, this commit fully enables all
tests for upgrading the PojoSerializer for all state backends, which
otherwise could not pass before.

This closes #4044.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b26460b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b26460b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b26460b

Branch: refs/heads/master
Commit: 8b26460b72bc4322aea7a9feaa3a728646c0399a
Parents: f0f2e99b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 22:48:45 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:39:17 2017 +0200

----------------------------------------------------------------------
 .../typeserializerupgrade/PojoSerializerUpgradeTest.java  | 10 ----------
 1 file changed, 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b26460b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index a925d43..e4dd535 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -52,7 +51,6 @@ import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -74,7 +72,6 @@ import static org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.CH
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeTrue;
 
 @RunWith(Parameterized.class)
 public class PojoSerializerUpgradeTest extends TestLogger {
@@ -181,7 +178,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	/**
 	 * We should be able to handle a changed field order of a POJO as operator state
 	 */
-	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
 	public void testChangedFieldOrderWithOperatorState() throws Exception {
 		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, false);
@@ -192,7 +188,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	 */
 	@Test
 	public void testChangedFieldTypesWithKeyedState() throws Exception {
-		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
 		try {
 			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, true);
 			fail("Expected a state migration exception.");
@@ -208,7 +203,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	/**
 	 * Changing field types of a POJO as operator state should require a state migration
 	 */
-	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
 	public void testChangedFieldTypesWithOperatorState() throws Exception {
 		try {
@@ -228,7 +222,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	 */
 	@Test
 	public void testAdditionalFieldWithKeyedState() throws Exception {
-		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
 		try {
 			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
 			fail("Expected a state migration exception.");
@@ -244,7 +237,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	/**
 	 * Adding fields to a POJO as operator state should require a state migration
 	 */
-	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
 	public void testAdditionalFieldWithOperatorState() throws Exception {
 		try {
@@ -262,7 +254,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	/**
 	 * Removing fields from a POJO as keyed state should require a state migration
 	 */
-	@Ignore("Ignore this test until FLINK-6801 has been fixed.")
 	@Test
 	public void testMissingFieldWithKeyedState() throws Exception {
 		try {
@@ -280,7 +271,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	/**
 	 * Removing fields from a POJO as operator state should require a state migration
 	 */
-	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
 	public void testMissingFieldWithOperatorState() throws Exception {
 		try {


[02/15] flink git commit: [hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes

Posted by tz...@apache.org.
[hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes

Prior to this commit, the configuration snapshot classes of Scala
serializers did not have the proper default empty constructor that is
used for deserializing the configuration snapshot.

Since some Scala serializers' config snapshots extend the Java
CompositeTypeSerializerConfigSnapshot, their config snapshot classes are
also changed to be implemented in Java since in Scala we can only call a
single base class constructor from subclasses.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6edb72d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6edb72d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6edb72d4

Branch: refs/heads/master
Commit: 6edb72d42f95978cdb426b0ca1d8d4c50a3c700a
Parents: 6abbba2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Jun 8 08:52:04 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:26:48 2017 +0200

----------------------------------------------------------------------
 .../ScalaOptionSerializerConfigSnapshot.java    | 47 +++++++++++++
 .../ScalaTrySerializerConfigSnapshot.java       | 50 ++++++++++++++
 .../TraversableSerializerConfigSnapshot.java    | 47 +++++++++++++
 .../scala/typeutils/EnumValueSerializer.scala   | 14 ++--
 .../api/scala/typeutils/OptionSerializer.scala  | 62 ++++++++++-------
 .../scala/typeutils/TraversableSerializer.scala | 25 ++-----
 .../api/scala/typeutils/TrySerializer.scala     | 72 +++++++++++---------
 7 files changed, 233 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
new file mode 100644
index 0000000..03eef12
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link OptionSerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaOptionSerializerConfigSnapshot() {}
+
+	public ScalaOptionSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
new file mode 100644
index 0000000..6abb3ea
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TrySerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public class ScalaTrySerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaTrySerializerConfigSnapshot() {}
+
+	public ScalaTrySerializerConfigSnapshot(
+			TypeSerializer<E> elementSerializer,
+			TypeSerializer<Throwable> throwableSerializer) {
+
+		super(elementSerializer, throwableSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
new file mode 100644
index 0000000..9a39421
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TraversableSerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public class TraversableSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public TraversableSerializerConfigSnapshot() {}
+
+	public TraversableSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 843079a..d549623 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerialize
 import org.apache.flink.api.common.typeutils.base.IntSerializer
 import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{InstantiationUtil, Preconditions}
 
 /**
  * Serializer for [[Enumeration]] values.
@@ -111,13 +111,17 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
 
 object EnumValueSerializer {
 
-  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration](private var enumClass: Class[E])
+  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration]
       extends TypeSerializerConfigSnapshot {
 
-    var enumConstants: Array[E] = enumClass.getEnumConstants
+    var enumClass: Class[E] = _
+    var enumConstants: Array[E] = _
 
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
+    def this(enumClass: Class[E]) = {
+      this()
+      this.enumClass = Preconditions.checkNotNull(enumClass)
+      this.enumConstants = enumClass.getEnumConstants
+    }
 
     override def write(out: DataOutputView): Unit = {
       super.write(out)

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 8adfb5c..810c91c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -100,46 +100,56 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot[A] = {
-    new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer)
+  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
+    new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = {
+
     configSnapshot match {
       case optionSerializerConfigSnapshot
+          : ScalaOptionSerializerConfigSnapshot[A] =>
+        ensureCompatibility(optionSerializerConfigSnapshot)
+      case legacyOptionSerializerConfigSnapshot
           : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
-        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
-          elemSerializer)
-
-        if (compatResult.isRequiresMigration) {
-          if (compatResult.getConvertDeserializer != null) {
-            CompatibilityResult.requiresMigration(
-              new OptionSerializer[A](
-                new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
-          } else {
-            CompatibilityResult.requiresMigration()
-          }
-        } else {
-          CompatibilityResult.compatible()
-        }
-
+        ensureCompatibility(legacyOptionSerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def ensureCompatibility(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+      : CompatibilityResult[Option[A]] = {
+
+    val compatResult = CompatibilityUtil.resolveCompatibilityResult(
+      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+      elemSerializer)
+
+    if (compatResult.isRequiresMigration) {
+      if (compatResult.getConvertDeserializer != null) {
+        CompatibilityResult.requiresMigration(
+          new OptionSerializer[A](
+            new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
+      } else {
+        CompatibilityResult.requiresMigration()
+      }
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
 
 object OptionSerializer {
 
-  class OptionSerializerConfigSnapshot[A](
-      private val elemSerializer: TypeSerializer[A])
-    extends CompositeTypeSerializerConfigSnapshot(elemSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
+  /**
+    * We need to keep this to be compatible with snapshots taken in Flink 1.3.0.
+    * Once Flink 1.3.x is no longer supported, this can be removed.
+    */
+  class OptionSerializerConfigSnapshot[A]()
+      extends CompositeTypeSerializerConfigSnapshot {
 
     override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 6299a24..5963987 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -151,16 +151,16 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     obj.isInstanceOf[TraversableSerializer[_, _]]
   }
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
+  override def snapshotConfiguration(): TraversableSerializerConfigSnapshot[E] = {
+    new TraversableSerializerConfigSnapshot[E](elementSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
 
     configSnapshot match {
-      case traversableSerializerConfigSnapshot:
-          TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>
+      case traversableSerializerConfigSnapshot
+          : TraversableSerializerConfigSnapshot[E] =>
 
         val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
           traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
@@ -178,20 +178,3 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     }
   }
 }
-
-object TraversableSerializer {
-
-  class TraversableSerializerConfigSnapshot[E](
-      private var elementSerializer: TypeSerializer[E])
-    extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
-
-    override def getVersion = TraversableSerializerConfigSnapshot.VERSION
-  }
-
-  object TraversableSerializerConfigSnapshot {
-    val VERSION = 1
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index 641caa1..a88cce7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -103,51 +103,59 @@ class TrySerializer[A](
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TrySerializer.TrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer)
+  override def snapshotConfiguration(): ScalaTrySerializerConfigSnapshot[A] = {
+    new ScalaTrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = {
 
     configSnapshot match {
-      case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot[A] =>
-        val previousSerializersAndConfigs =
-          trySerializerConfigSnapshot.getNestedSerializersAndConfigs
-
-        val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
-          previousSerializersAndConfigs.get(0).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousSerializersAndConfigs.get(0).f1,
-          elemSerializer)
-
-        val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
-          previousSerializersAndConfigs.get(1).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousSerializersAndConfigs.get(1).f1,
-          throwableSerializer)
-
-        if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) {
-          CompatibilityResult.requiresMigration()
-        } else {
-          CompatibilityResult.compatible()
-        }
-
+      case trySerializerConfigSnapshot
+          : ScalaTrySerializerConfigSnapshot[A] =>
+        ensureCompatibility(trySerializerConfigSnapshot)
+      case legacyTrySerializerConfigSnapshot
+          : TrySerializer.TrySerializerConfigSnapshot[A] =>
+        ensureCompatibility(legacyTrySerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def ensureCompatibility(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+      : CompatibilityResult[Option[A]] = {
+
+    val previousSerializersAndConfigs =
+      compositeConfigSnapshot.getNestedSerializersAndConfigs
+
+    val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+      previousSerializersAndConfigs.get(0).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousSerializersAndConfigs.get(0).f1,
+      elemSerializer)
+
+    val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+      previousSerializersAndConfigs.get(1).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousSerializersAndConfigs.get(1).f1,
+      throwableSerializer)
+
+    if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) {
+      CompatibilityResult.requiresMigration()
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
 
 object TrySerializer {
 
-  class TrySerializerConfigSnapshot[A](
-      private var elemSerializer: TypeSerializer[A],
-      private var throwableSerializer: TypeSerializer[Throwable])
-    extends CompositeTypeSerializerConfigSnapshot(
-      elemSerializer, throwableSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null, null)
+  /**
+    * We need to keep this to be compatible with snapshots taken in Flink 1.3.0.
+    * Once Flink 1.3.x is no longer supported, this can be removed.
+    */
+  class TrySerializerConfigSnapshot[A]()
+      extends CompositeTypeSerializerConfigSnapshot() {
 
     override def getVersion: Int = TrySerializerConfigSnapshot.VERSION
   }


[03/15] flink git commit: [FLINK-6869] [core] Tolerate serialVersionUID mismatches for Scala and anonymous serializers

Posted by tz...@apache.org.
[FLINK-6869] [core] Tolerate serialVersionUID mismatches for Scala and anonymous serializers

This commit lets the TypeSerializerSerializationProxy be tolerable for
serialVersionUID mismatches when reading anonymous classed serializers
or our Scala serializers.

Our Scala serializers require this since they use Scala macros to be
generated at compile time, and therefore is not possible to fix a
certain serialVersionUID for them. For non-generated Scala serializers,
we still also need this because their serialVersionUIDs pre-1.3 may
vary depending on the Scala version used.

This can be seen as a workaround, and should be reverted once 1.2
savepoint compatibility is no longer maintained.

This commit also updates the streaming state docs to educate the user to
avoid using anonymous classes for their state serializers.

This closes #4090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b216a4a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b216a4a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b216a4a0

Branch: refs/heads/master
Commit: b216a4a0acf4e4d0463c3ed961d6a0258223491a
Parents: 75ea808
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Jun 10 22:41:35 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:37:01 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state.md                        |   7 +
 .../TypeSerializerSerializationUtil.java        |  82 +++++++++-
 .../TypeSerializerSerializationUtilTest.java    | 158 ++++++++++++++++++-
 ...ckendStateMetaInfoSnapshotReaderWriters.java |  14 +-
 4 files changed, 254 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 97f0c29..0025fae 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -453,6 +453,13 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor =
 checkpointedState = getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
+Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following
+subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using
+anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname,
+varying across compilers and depends on the order that they are instantiated within the enclosing class, which can 
+easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the
+classpath).
+
 ### Handling serializer upgrades and compatibility
 
 Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 3d79d9a..058ef46 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -32,10 +32,16 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InvalidClassException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Utility methods for serialization of {@link TypeSerializer} and {@link TypeSerializerConfigSnapshot}.
@@ -46,6 +52,67 @@ public class TypeSerializerSerializationUtil {
 	private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationUtil.class);
 
 	/**
+	 * This is maintained as a temporary workaround for FLINK-6869.
+	 *
+	 * <p>Before 1.3, the Scala serializers did not specify the serialVersionUID.
+	 * Although since 1.3 they are properly specified, we still have to ignore them for now
+	 * as their previous serialVersionUIDs will vary depending on the Scala version.
+	 *
+	 * <p>This can be removed once 1.2 is no longer supported.
+	 */
+	private static Set<String> scalaSerializerClassnames = new HashSet<>();
+	static {
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
+	}
+
+	/**
+	 * An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of
+	 * anonymous classes or our Scala serializer classes.
+	 *
+	 * <p>The {@link TypeSerializerSerializationProxy} uses this specific object input stream to read serializers,
+	 * so that mismatching serialVersionUIDs of anonymous classes / Scala serializers are ignored.
+	 * This is a required workaround to maintain backwards compatibility for our pre-1.3 Scala serializers.
+	 * See FLINK-6869 for details.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6869">FLINK-6869</a>
+	 */
+	public static class SerialUIDMismatchTolerantInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
+
+		public SerialUIDMismatchTolerantInputStream(InputStream in, ClassLoader cl) throws IOException {
+			super(in, cl);
+		}
+
+		@Override
+		protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {
+			ObjectStreamClass streamClassDescriptor = super.readClassDescriptor();
+
+			Class localClass = resolveClass(streamClassDescriptor);
+			if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass()
+				// isAnonymousClass does not work for anonymous Scala classes; additionally check by classname
+				|| localClass.getName().contains("$anon$") || localClass.getName().contains("$anonfun")) {
+
+				ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass);
+				if (localClassDescriptor != null
+					&& localClassDescriptor.getSerialVersionUID() != streamClassDescriptor.getSerialVersionUID()) {
+					LOG.warn("Ignoring serialVersionUID mismatch for anonymous class {}; was {}, now {}.",
+						streamClassDescriptor.getName(), streamClassDescriptor.getSerialVersionUID(), localClassDescriptor.getSerialVersionUID());
+
+					streamClassDescriptor = localClassDescriptor;
+				}
+			}
+
+			return streamClassDescriptor;
+		}
+	}
+
+	/**
 	 * Writes a {@link TypeSerializer} to the provided data output view.
 	 *
 	 * <p>It is written with a format that can be later read again using
@@ -354,6 +421,7 @@ public class TypeSerializerSerializationUtil {
 			}
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
 		public void read(DataInputView in) throws IOException {
 			super.read(in);
@@ -362,8 +430,14 @@ public class TypeSerializerSerializationUtil {
 			int serializerBytes = in.readInt();
 			byte[] buffer = new byte[serializerBytes];
 			in.readFully(buffer);
-			try {
-				typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
+
+			ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+			try (
+				SerialUIDMismatchTolerantInputStream ois =
+					new SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), userClassLoader)) {
+
+				Thread.currentThread().setContextClassLoader(userClassLoader);
+				typeSerializer = (TypeSerializer<T>) ois.readObject();
 			} catch (ClassNotFoundException | InvalidClassException e) {
 				if (useDummyPlaceholder) {
 					// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
@@ -372,8 +446,10 @@ public class TypeSerializerSerializationUtil {
 						new UnloadableDummyTypeSerializer<>(buffer);
 					LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
 				} else {
-					throw new IOException("Missing class for type serializer.", e);
+					throw new IOException("Unloadable class for type serializer.", e);
 				}
+			} finally {
+				Thread.currentThread().setContextClassLoader(previousClassLoader);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
index 738644b..10df619 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -29,7 +29,9 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -39,8 +41,11 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -55,7 +60,10 @@ import static org.mockito.Mockito.mock;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(TypeSerializerSerializationUtil.class)
-public class TypeSerializerSerializationUtilTest {
+public class TypeSerializerSerializationUtilTest implements Serializable {
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
 	 * Verifies that reading and writing serializers work correctly.
@@ -236,6 +244,36 @@ public class TypeSerializerSerializationUtilTest {
 		Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1);
 	}
 
+	/**
+	 * Verifies that serializers of anonymous classes can be deserialized, even if serialVersionUID changes.
+	 */
+	@Test
+	public void testAnonymousSerializerClassWithChangedSerialVersionUID() throws Exception {
+
+		TypeSerializer anonymousClassSerializer = new AbstractIntSerializer() {};
+		// assert that our assumption holds
+		Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
+
+		byte[] anonymousSerializerBytes;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), anonymousClassSerializer);
+			anonymousSerializerBytes = out.toByteArray();
+		}
+
+		long newSerialVersionUID = 1234567L;
+		// assert that we're actually modifying to a different serialVersionUID
+		Assert.assertNotEquals(ObjectStreamClass.lookup(anonymousClassSerializer.getClass()).getSerialVersionUID(), newSerialVersionUID);
+		modifySerialVersionUID(anonymousSerializerBytes, anonymousClassSerializer.getClass().getName(), newSerialVersionUID);
+
+		try (ByteArrayInputStream in = new ByteArrayInputStream(anonymousSerializerBytes)) {
+			anonymousClassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		// serializer should have been deserialized despite serialVersionUID mismatch
+		Assert.assertNotNull(anonymousClassSerializer);
+		Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
+	}
+
 	public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
 
 		static final int VERSION = 1;
@@ -292,4 +330,122 @@ public class TypeSerializerSerializationUtilTest {
 			return 31 * val + msg.hashCode();
 		}
 	}
+
+	private static void modifySerialVersionUID(byte[] objectBytes, String classname, long newSerialVersionUID) throws Exception {
+		byte[] classnameBytes = classname.getBytes();
+
+		// serialVersionUID follows directly after classname in the object byte stream;
+		// advance serialVersionUIDPosition until end of classname in stream
+		int serialVersionUIDOffset;
+		boolean foundClass = false;
+		int numMatchedBytes = 0;
+		for (serialVersionUIDOffset = 0; serialVersionUIDOffset < objectBytes.length; serialVersionUIDOffset++) {
+			if (objectBytes[serialVersionUIDOffset] == classnameBytes[numMatchedBytes]) {
+				numMatchedBytes++;
+				foundClass = true;
+			} else {
+				if (objectBytes[serialVersionUIDOffset] == classnameBytes[0]) {
+					numMatchedBytes = 1;
+				} else {
+					numMatchedBytes = 0;
+					foundClass = false;
+				}
+			}
+
+			if (numMatchedBytes == classnameBytes.length) {
+				break;
+			}
+		}
+
+		if (!foundClass) {
+			throw new RuntimeException("Could not find class " + classname + " in object byte stream.");
+		}
+
+		byte[] newUIDBytes = ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(newSerialVersionUID).array();
+
+		// replace original serialVersionUID bytes with new serialVersionUID bytes
+		for (int uidIndex = 0; uidIndex < newUIDBytes.length; uidIndex++) {
+			objectBytes[serialVersionUIDOffset + 1 + uidIndex] = newUIDBytes[uidIndex];
+		}
+	}
+
+	public static abstract class AbstractIntSerializer extends TypeSerializer<Integer> {
+
+		public static final long serialVersionUID = 1;
+
+		@Override
+		public Integer createInstance() {
+			return IntSerializer.INSTANCE.createInstance();
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return IntSerializer.INSTANCE.isImmutableType();
+		}
+
+		@Override
+		public Integer copy(Integer from) {
+			return IntSerializer.INSTANCE.copy(from);
+		}
+
+		@Override
+		public Integer copy(Integer from, Integer reuse) {
+			return IntSerializer.INSTANCE.copy(from, reuse);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public Integer deserialize(DataInputView source) throws IOException {
+			return IntSerializer.INSTANCE.deserialize(source);
+		}
+
+		@Override
+		public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+			return IntSerializer.INSTANCE.deserialize(reuse, source);
+		}
+
+		@Override
+		public void serialize(Integer record, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.serialize(record, target);
+		}
+
+		@Override
+		public TypeSerializer<Integer> duplicate() {
+			return IntSerializer.INSTANCE.duplicate();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return IntSerializer.INSTANCE.snapshotConfiguration();
+		}
+
+		@Override
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
+		}
+
+		@Override
+		public int getLength() {
+			return IntSerializer.INSTANCE.getLength();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return IntSerializer.INSTANCE.canEqual(obj);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return IntSerializer.INSTANCE.equals(obj);
+		}
+
+		@Override
+		public int hashCode() {
+			return IntSerializer.INSTANCE.hashCode();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index e52323f..dc322c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +156,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 			super(userCodeClassLoader);
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
 		public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException {
 			RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
@@ -164,12 +164,20 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
 			stateMetaInfo.setName(in.readUTF());
 			stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
 			DataInputViewStream dis = new DataInputViewStream(in);
-			try {
-				TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader);
+			ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+			try (
+				TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois =
+					new TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, userCodeClassLoader)) {
+
+				Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+				TypeSerializer<S> stateSerializer = (TypeSerializer<S>) ois.readObject();
 				stateMetaInfo.setPartitionStateSerializer(stateSerializer);
 			} catch (ClassNotFoundException exception) {
 				throw new IOException(exception);
+			} finally {
+				Thread.currentThread().setContextClassLoader(previousClassLoader);
 			}
 
 			// old versions do not contain the partition state serializer's configuration snapshot


[08/15] flink git commit: [hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase

Posted by tz...@apache.org.
[hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61a45e4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61a45e4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61a45e4b

Branch: refs/heads/master
Commit: 61a45e4b2895905efcffd216789bf753fc9f5c56
Parents: 2d34af3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 15:51:12 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 10 +++++-----
 .../streaming/util/AbstractStreamOperatorTestHarness.java |  5 ++++-
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61a45e4b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 758e894..7cbfb15 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1063,7 +1063,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws RocksDBException
 		 */
 		public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
-				throws IOException, ClassNotFoundException, RocksDBException {
+				throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException {
 
 			rocksDBKeyedStateBackend.createDB();
 
@@ -1089,7 +1089,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 */
 		private void restoreKeyGroupsInStateHandle()
-				throws IOException, RocksDBException, ClassNotFoundException {
+				throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException {
 			try {
 				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
 				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
@@ -1111,7 +1111,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 * @throws RocksDBException
 		 */
-		private void restoreKVStateMetaData() throws IOException, RocksDBException {
+		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
 
 			KeyedBackendSerializationProxy<K> serializationProxy =
 					new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
@@ -1128,7 +1128,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				.isRequiresMigration()) {
 
 				// TODO replace with state migration; note that key hash codes need to remain the same after migration
-				throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+				throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
 					"Aborting now since state migration is currently not available");
 			}
 
@@ -1248,7 +1248,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					.isRequiresMigration()) {
 
 					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+					throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
 						"Aborting now since state migration is currently not available");
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/61a45e4b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index fd781f6..47e8726 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -186,7 +186,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(environment);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-		when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader());
+
+		ClassLoader cl = environment.getUserClassLoader();
+		when(mockTask.getUserCodeClassLoader()).thenReturn(cl);
+
 		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
 		when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
 


[15/15] flink git commit: [FLINK-6848] [doc] Update managed state docs to include Scala snippets

Posted by tz...@apache.org.
[FLINK-6848] [doc] Update managed state docs to include Scala snippets

Add an example of how to work with managed state in Scala.

This closes #4072.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23c82e3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23c82e3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23c82e3c

Branch: refs/heads/master
Commit: 23c82e3cc9d632c17850f3c7d2b3a1ab0a0cd5cb
Parents: 8b26460
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Sun Jun 4 16:08:44 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 07:14:38 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state.md | 212 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 203 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23c82e3c/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0025fae..65d0d75 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -115,7 +115,7 @@ of elements that are added to the state. The interface is the same as for `ListS
 added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
 
 * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
-retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or 
+retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
 views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
 
@@ -152,6 +152,8 @@ is available in a `RichFunction` has these methods for accessing state:
 
 This is an example `FlatMapFunction` that shows how all of the parts fit together:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
@@ -201,6 +203,66 @@ env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2
 
 // the printed output will be (1,4) and (1,5)
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
+
+  private var sum: ValueState[(Long, Long)] = _
+
+  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
+
+    // access the state value
+    val tmpCurrentSum = sum.value
+
+    // If it hasn't been used before, it will be null
+    val currentSum = if (tmpCurrentSum != null) {
+      tmpCurrentSum
+    } else {
+      (0L, 0L)
+    }
+
+    // update the count
+    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
+
+    // update the state
+    sum.update(newSum)
+
+    // if the count reaches 2, emit the average and clear the state
+    if (newSum._1 >= 2) {
+      out.collect((input._1, newSum._2 / newSum._1))
+      sum.clear()
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    sum = getRuntimeContext.getState(
+      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
+    )
+  }
+}
+
+
+object ExampleCountWindowAverage extends App {
+  val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  env.fromCollection(List(
+    (1L, 3L),
+    (1L, 5L),
+    (1L, 7L),
+    (1L, 4L),
+    (1L, 2L)
+  )).keyBy(_._1)
+    .flatMap(new CountWindowAverage())
+    .print()
+  // the printed output will be (1,4) and (1,5)
+
+  env.execute("ExampleManagedState")
+}
+{% endhighlight %}
+</div>
+</div>
 
 This example implements a poor man's counting window. We key the tuples by the first field
 (in the example all have the same key `1`). The function stores the count and a running sum in
@@ -268,6 +330,8 @@ Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction
 to buffer elements before sending them to the outside world. It demonstrates
 the basic even-split redistribution list state:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class BufferingSink
         implements SinkFunction<Tuple2<String, Integer>>,
@@ -311,7 +375,7 @@ public class BufferingSink
                 "buffered-elements",
                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                 Tuple2.of(0L, 0L));
-                
+
         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 
         if (context.isRestored()) {
@@ -328,6 +392,59 @@ public class BufferingSink
     }
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class BufferingSink(threshold: Int = 0)
+  extends SinkFunction[(String, Int)]
+    with CheckpointedFunction
+    with CheckpointedRestoring[List[(String, Int)]] {
+
+  @transient
+  private var checkpointedState: ListState[(String, Int)] = null
+
+  private val bufferedElements = ListBuffer[(String, Int)]()
+
+  override def invoke(value: (String, Int)): Unit = {
+    bufferedElements += value
+    if (bufferedElements.size == threshold) {
+      for (element <- bufferedElements) {
+        // send it to the sink
+      }
+      bufferedElements.clear()
+    }
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    checkpointedState.clear()
+    for (element <- bufferedElements) {
+      checkpointedState.add(element)
+    }
+  }
+
+  override def initializeState(context: FunctionInitializationContext): Unit = {
+    val descriptor = new ListStateDescriptor[(String, Int)](
+      "buffered-elements",
+      TypeInformation.of(new TypeHint[(String, Int)]() {})
+    )
+
+    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+    if(context.isRestored) {
+      for(element <- checkpointedState.get()) {
+        bufferedElements += element
+      }
+    }
+  }
+
+  override def restoreState(state: List[(String, Int)]): Unit = {
+    bufferedElements ++= state
+  }
+}
+{% endhighlight %}
+</div>
+</div>
 
 The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize
 the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects
@@ -337,16 +454,32 @@ Note how the state is initialized, similar to keyed state,
 with a `StateDescriptor` that contains the state name and information
 about the type of the value that the state holds:
 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
     new ListStateDescriptor<>(
         "buffered-elements",
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
-        Tuple2.of(0L, 0L));
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
 
 checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 {% endhighlight %}
 
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val descriptor = new ListStateDescriptor[(String, Long)](
+    "buffered-elements",
+    TypeInformation.of(new TypeHint[(String, Long)]() {})
+)
+
+checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+{% endhighlight %}
+</div>
+</div>
 The naming convention of the state access methods contain its redistribution
 pattern followed by its state structure. For example, to use list state with the
 union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`.
@@ -385,6 +518,8 @@ Stateful sources require a bit more care as opposed to other operators.
 In order to make the updates to the state and output collection atomic (required for exactly-once semantics
 on failure/recovery), the user is required to get a lock from the source's context.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public static class CounterSource
         extends RichParallelSourceFunction<Long>
@@ -426,6 +561,46 @@ public static class CounterSource
     }
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CounterSource
+       extends RichParallelSourceFunction[Long]
+       with ListCheckpointed[Long] {
+
+  @volatile
+  private var isRunning = true
+
+  private var offset = 0L
+
+  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (isRunning) {
+      // output and state update are atomic
+      lock.synchronized({
+        ctx.collect(offset)
+
+        offset += 1
+      })
+    }
+  }
+
+  override def cancel(): Unit = isRunning = false
+
+  override def restoreState(state: util.List[Long]): Unit =
+    for (s <- state) {
+      offset = s
+    }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
+    Collections.singletonList(offset)
+
+}
+{% endhighlight %}
+</div>
+</div>
 
 Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
 
@@ -433,7 +608,7 @@ Some operators might need the information when a checkpoint is fully acknowledge
 
 This section is targeted as a guideline for users who require the use of custom serialization for their state, covering how
 to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
-Flink's own serializers, this section is irrelevant and can be skipped. 
+Flink's own serializers, this section is irrelevant and can be skipped.
 
 ### Using custom serializers
 
@@ -444,14 +619,33 @@ to specify the state's name, as well as information about the type of the state.
 It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
 simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
+public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};
+
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
     new ListStateDescriptor<>(
         "state-name",
-        new TypeSerializer<> {...});
+        new CustomTypeSerializer());
 
 checkpointedState = getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
+
+val descriptor = new ListStateDescriptor[(String, Integer)](
+    "state-name",
+    new CustomTypeSerializer)
+)
+
+checkpointedState = getRuntimeContext.getListState(descriptor);
+{% endhighlight %}
+</div>
+</div>
 
 Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following
 subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using
@@ -542,7 +736,7 @@ The above cases can be translated to code by returning one of the following from
 
   * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to
     be compatible, and Flink can proceed with the job with the serializer as is.
-    
+
   * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be
     reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration
     is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again
@@ -551,7 +745,7 @@ The above cases can be translated to code by returning one of the following from
   * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics
     to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded
     to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort.
-  
+
 <span class="label label-danger">Attention</span> Currently, as of Flink 1.3, if the result of the compatibility check
 acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state
 migration is currently not available. The ability to migrate state will be introduced in future releases.
@@ -560,7 +754,7 @@ migration is currently not available. The ability to migrate state will be intro
 
 Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state
 values, the availability of the classes within the classpath may affect restore behaviour.
- 
+
 `TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new
 serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be
 able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified


[10/15] flink git commit: [FLINK-6803] [tests] Add test for PojoSerializer state upgrade

Posted by tz...@apache.org.
[FLINK-6803] [tests] Add test for PojoSerializer state upgrade

The added PojoSerializerUpgradeTest tests the state migration behaviour when the
underlying pojo type changes and one tries to recover from old state. Currently
not all tests could be activated, because there still some pending issues to be
fixed first. We should arm these tests once the issues have been fixed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d34af34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d34af34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d34af34

Branch: refs/heads/master
Commit: 2d34af345e241b23489b6bbdd2a752243c1e44fd
Parents: e35c575
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 31 18:37:12 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   5 +-
 .../state/RocksDBStateBackendTest.java          |   2 +-
 .../api/common/typeutils/CompatibilityUtil.java |   6 +-
 .../java/typeutils/runtime/FieldSerializer.java |   2 +-
 .../flink/util/StateMigrationException.java     |  38 ++
 .../state/heap/HeapKeyedStateBackend.java       |   3 +-
 .../runtime/state/StateBackendTestBase.java     | 133 ++----
 flink-streaming-java/pom.xml                    |   1 -
 .../util/AbstractStreamOperatorTestHarness.java |   1 -
 flink-tests/pom.xml                             |   8 +
 .../PojoSerializerUpgradeTest.java              | 445 +++++++++++++++++++
 11 files changed, 549 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 241c0b3..758e894 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -79,6 +79,7 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -1506,7 +1507,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 */
 	@SuppressWarnings("rawtypes, unchecked")
 	protected <N, S> ColumnFamilyHandle getColumnFamily(
-			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {
+			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
 
 		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
 				kvStateInformation.get(descriptor.getName());
@@ -1557,7 +1558,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				return stateInfo.f0;
 			} else {
 				// TODO state migration currently isn't possible.
-				throw new RuntimeException("State migration currently isn't supported.");
+				throw new StateMigrationException("State migration isn't supported, yet.");
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 8b44a47..cf363fa 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -96,7 +96,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	private ValueState<Integer> testState1;
 	private ValueState<String> testState2;
 
-	@Parameterized.Parameters
+	@Parameterized.Parameters(name = "Incremental checkpointing: {0}")
 	public static Collection<Boolean> parameters() {
 		return Arrays.asList(false, true);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index 8959628..94bb9bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -19,6 +19,8 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
+
 /**
  * Utilities related to serializer compatibility.
  */
@@ -40,7 +42,7 @@ public class CompatibilityUtil {
 	 *      If yes, use that for state migration and simply return the result.
 	 *   6. If all of above fails, state migration is required but could not be performed; throw exception.
 	 *
-	 * @param precedingSerializer the preceding serializer used to write the data
+	 * @param precedingSerializer the preceding serializer used to write the data, null if none could be retrieved
 	 * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder
 	 * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer
 	 * @param newSerializer the new serializer to ensure compatibility with
@@ -51,7 +53,7 @@ public class CompatibilityUtil {
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> CompatibilityResult<T> resolveCompatibilityResult(
-			TypeSerializer<?> precedingSerializer,
+			@Nullable TypeSerializer<?> precedingSerializer,
 			Class<?> dummySerializerClassTag,
 			TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
 			TypeSerializer<T> newSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
index 5519889..56a4445 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
@@ -51,7 +51,7 @@ public class FieldSerializer {
 				clazz = clazz.getSuperclass();
 			}
 		}
-		throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+		throw new IOException("Class resolved at TaskManager is not compatible with class read during Plan setup."
 				+ " (" + fieldName + ")");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java b/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java
new file mode 100644
index 0000000..1667ff5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/StateMigrationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Base class for state migration related exceptions
+ */
+public class StateMigrationException extends FlinkException {
+	private static final long serialVersionUID = 8268516412747670839L;
+
+	public StateMigrationException(String message) {
+		super(message);
+	}
+
+	public StateMigrationException(Throwable cause) {
+		super(cause);
+	}
+
+	public StateMigrationException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index d4ba204..2ab9691 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -399,7 +400,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						.isRequiresMigration()) {
 
 						// TODO replace with state migration; note that key hash codes need to remain the same after migration
-						throw new IllegalStateException("The new key serializer is not compatible to read previous keys. " +
+						throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
 							"Aborting now since state migration is currently not available");
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 8d4a38e..f08ad2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -68,6 +68,7 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -1804,53 +1805,44 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testRestoreWithWrongKeySerializer() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
+	public void testRestoreWithWrongKeySerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
 
-			// use an IntSerializer at first
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		// use an IntSerializer at first
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 
-			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-			// write some state
-			backend.setCurrentKey(1);
-			state.update("1");
-			backend.setCurrentKey(2);
-			state.update("2");
+		// write some state
+		backend.setCurrentKey(1);
+		state.update("1");
+		backend.setCurrentKey(2);
+		state.update("2");
 
-			// draw a snapshot
-			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// draw a snapshot
+		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
+		backend.dispose();
 
-			// restore with the wrong key serializer
-			try {
-				restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
+		// restore with the wrong key serializer
+		try {
+			restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
 
-				fail("should recognize wrong key serializer");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("The new key serializer is not compatible")) {
-					fail("wrong exception " + e);
-				}
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+			fail("should recognize wrong key serializer");
+		} catch (StateMigrationException ignored) {
+			// expected
 		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testValueStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testValueStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
 
 			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1880,29 +1872,21 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.value();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
+		} finally {
 			backend.dispose();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testListStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testListStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1931,29 +1915,21 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
+		} finally {
 			backend.dispose();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testReducingStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testReducingStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id",
 					new AppendingReduce(),
 					StringSerializer.INSTANCE);
@@ -1984,29 +1960,21 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.get();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
+		} finally {
 			backend.dispose();
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testMapStateRestoreWithWrongSerializers() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testMapStateRestoreWithWrongSerializers() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
+		try {
 			MapStateDescriptor<String, String> kvId = new MapStateDescriptor<>("id", StringSerializer.INSTANCE, StringSerializer.INSTANCE);
 			MapState<String, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -2025,7 +1993,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			@SuppressWarnings("unchecked")
 			TypeSerializer<String> fakeStringSerializer =
-					(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+				(TypeSerializer<String>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
 
 			try {
 				kvId = new MapStateDescriptor<>("id", fakeStringSerializer, StringSerializer.INSTANCE);
@@ -2035,19 +2003,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				state.entries();
 
 				fail("should recognize wrong serializers");
-			} catch (RuntimeException e) {
-				if (!e.getMessage().contains("State migration currently isn't supported")) {
-					fail("wrong exception " + e);
-				}
+			} catch (StateMigrationException ignored) {
 				// expected
-			} catch (Exception e) {
-				fail("wrong exception " + e);
 			}
 			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+		} finally {
+			backend.dispose();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-streaming-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index ab972a9..6976519 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -90,7 +90,6 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 6f2d349..fd781f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index dd6e949..4caf8a6 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -54,6 +54,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-optimizer_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/2d34af34/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
new file mode 100644
index 0000000..2769c50
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.typeserializerupgrade;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.util.DynamicCodeLoadingException;
+import org.apache.flink.util.StateMigrationException;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+@RunWith(Parameterized.class)
+public class PojoSerializerUpgradeTest extends TestLogger {
+
+	@Parameterized.Parameters(name = "StateBackend: {0}")
+	public static Collection<String> parameters () {
+		return Arrays.asList(
+			AbstractStateBackend.MEMORY_STATE_BACKEND_NAME,
+			AbstractStateBackend.FS_STATE_BACKEND_NAME,
+			AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME);
+	}
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	private StateBackend stateBackend;
+
+	public PojoSerializerUpgradeTest(String backendType) throws IOException, DynamicCodeLoadingException {
+		Configuration config = new Configuration();
+		config.setString(CoreOptions.STATE_BACKEND, backendType);
+		config.setString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, temporaryFolder.newFolder().toURI().toString());
+		stateBackend = AbstractStateBackend.loadStateBackendFromConfig(config, Thread.currentThread().getContextClassLoader(), null);
+	}
+
+	private static final String POJO_NAME = "Pojo";
+
+	private static final String SOURCE_A =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private long a; " +
+		"private String b; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
+
+	// changed order of fields which should be recoverable
+	private static final String SOURCE_B =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private String b; " +
+		"private long a; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
+
+	// changed type of a field which should not be recoverable
+	private static final String SOURCE_C =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private double a; " +
+		"private String b; " +
+		"public double getA() { return a;} " +
+		"public void setA(double value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b);} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \")\";}}";
+
+	// additional field which should not be recoverable
+	private static final String SOURCE_D =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private long a; " +
+		"private String b; " +
+		"private double c; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"public String getB() { return b; }" +
+		"public void setB(String value) { b = value; }" +
+		"public double getC() { return c; } " +
+		"public void setC(double value) { c = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a && b.equals(other.b) && c == other.c;} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a, b, c); } " +
+		"@Override public String toString() {return \"(\" + a + \", \" + b + \", \" + c + \")\";}}";
+
+	// missing field which should not be recoverable
+	private static final String SOURCE_E =
+		"import java.util.Objects;" +
+		"public class Pojo { " +
+		"private long a; " +
+		"public long getA() { return a;} " +
+		"public void setA(long value) { a = value; }" +
+		"@Override public boolean equals(Object obj) { if (obj instanceof Pojo) { Pojo other = (Pojo) obj; return a == other.a;} else { return false; }}" +
+		"@Override public int hashCode() { return Objects.hash(a); } " +
+		"@Override public String toString() {return \"(\" + a + \")\";}}";
+
+	/**
+	 * We should be able to handle a changed field order
+	 */
+	@Test
+	public void testChangedFieldOrder() throws Exception {
+		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B);
+	}
+
+	/**
+	 * Changing field types should require a state migration
+	 */
+	@Test
+	public void testChangedFieldTypes() throws Exception {
+		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Adding fields should require a state migration
+	 */
+	@Test
+	public void testAdditionalField() throws Exception {
+		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Removing fields should require a state migration
+	 */
+	@Ignore("Ignore this test until FLINK-6801 has been fixed.")
+	@Test
+	public void testMissingField() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	public void testPojoSerializerUpgrade(String classSourceA, String classSourceB) throws Exception {
+		final Configuration taskConfiguration = new Configuration();
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		final KeySelector<Long, Long> keySelector = new IdentityKeySelector<>();
+		final Collection<Long> inputs = Arrays.asList(1L, 2L, 45L, 67L, 1337L);
+
+		// run the program with classSourceA
+		File rootPath = temporaryFolder.newFolder();
+		File sourceFile = writeSourceFile(rootPath, POJO_NAME + ".java", classSourceA);
+		compileClass(sourceFile);
+
+		final ClassLoader classLoader = URLClassLoader.newInstance(
+			new URL[]{rootPath.toURI().toURL()},
+			Thread.currentThread().getContextClassLoader());
+
+		OperatorStateHandles stateHandles = runOperator(
+			taskConfiguration,
+			executionConfig,
+			new StreamMap<>(new StatefulMapper(true, false)),
+			keySelector,
+			stateBackend,
+			classLoader,
+			null,
+			inputs);
+
+		// run the program with classSourceB
+		rootPath = temporaryFolder.newFolder();
+
+		sourceFile = writeSourceFile(rootPath, POJO_NAME + ".java", classSourceB);
+		compileClass(sourceFile);
+
+		final ClassLoader classLoaderB = URLClassLoader.newInstance(
+			new URL[]{rootPath.toURI().toURL()},
+			Thread.currentThread().getContextClassLoader());
+
+		runOperator(
+			taskConfiguration,
+			executionConfig,
+			new StreamMap<>(new StatefulMapper(true, true)),
+			keySelector,
+			stateBackend,
+			classLoaderB,
+			stateHandles,
+			inputs);
+	}
+
+	private OperatorStateHandles runOperator(
+			Configuration taskConfiguration,
+			ExecutionConfig executionConfig,
+			OneInputStreamOperator<Long, Long> operator,
+			KeySelector<Long, Long> keySelector,
+			StateBackend stateBackend,
+			ClassLoader classLoader,
+			OperatorStateHandles operatorStateHandles,
+			Iterable<Long> input) throws Exception {
+
+		final MockEnvironment environment = new MockEnvironment(
+			"test task",
+			32 * 1024,
+			new MockInputSplitProvider(),
+			256,
+			taskConfiguration,
+			executionConfig,
+			16,
+			1,
+			0,
+			classLoader);
+
+		final KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			operator,
+			keySelector,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			environment);
+
+		harness.setStateBackend(stateBackend);
+
+		harness.setup();
+		harness.initializeState(operatorStateHandles);
+		harness.open();
+
+		long timestamp = 0L;
+
+		for (Long value : input) {
+			harness.processElement(value, timestamp++);
+		}
+
+
+		long checkpointId = 1L;
+		long checkpointTimestamp = timestamp + 1L;
+
+		OperatorStateHandles stateHandles = harness.snapshot(checkpointId, checkpointTimestamp);
+
+		harness.close();
+
+		return stateHandles;
+	}
+
+	private static File writeSourceFile(File root, String name, String source) throws IOException {
+		File sourceFile = new File(root, name);
+
+		sourceFile.getParentFile().mkdirs();
+
+		try (FileWriter writer = new FileWriter(sourceFile)) {
+			writer.write(source);
+		}
+
+		return sourceFile;
+	}
+
+	private static int compileClass(File sourceFile) {
+		JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+		return compiler.run(null, null, null, sourceFile.getPath());
+	}
+
+	private static final class StatefulMapper extends RichMapFunction<Long, Long> implements CheckpointedFunction {
+
+		private static final long serialVersionUID = -520490739059396832L;
+
+		private final boolean keyed;
+		private final boolean verify;
+
+		private transient ValueState<Object> valueState;
+		private transient MapState<Object, Object> mapState;
+		private transient ListState<Object> listState;
+		private transient ReducingState<Object> reducingState;
+		private transient Class<?> pojoClass;
+		private transient Field fieldA;
+		private transient Field fieldB;
+
+		public StatefulMapper(boolean keyed, boolean verify) {
+			this.keyed = keyed;
+			this.verify = verify;
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			Object pojo = pojoClass.newInstance();
+
+			fieldA.set(pojo, value);
+			fieldB.set(pojo, value + "");
+
+			if (verify) {
+				assertEquals(pojo, valueState.value());
+
+				assertTrue(mapState.contains(pojo));
+				assertEquals(pojo, mapState.get(pojo));
+
+				Iterator<Object> listIterator = listState.get().iterator();
+
+				boolean elementFound = false;
+
+				while(listIterator.hasNext()) {
+					elementFound |= pojo.equals(listIterator.next());
+				}
+
+				assertTrue(elementFound);
+
+				assertEquals(pojo, reducingState.get());
+			} else {
+				valueState.update(pojo);
+				mapState.put(pojo, pojo);
+				listState.add(pojo);
+				reducingState.add(pojo);
+			}
+
+			return value;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
+
+			fieldA = pojoClass.getDeclaredField("a");
+			fieldB = pojoClass.getDeclaredField("b");
+			fieldA.setAccessible(true);
+			fieldB.setAccessible(true);
+
+			if (keyed) {
+				valueState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("valueState", (Class<Object>) pojoClass));
+				mapState = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>("mapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
+				listState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("listState", (Class<Object>) pojoClass));
+
+				ReduceFunction<Object> reduceFunction = new FirstValueReducer<>();
+				reducingState = context.getKeyedStateStore().getReducingState(new ReducingStateDescriptor<>("reducingState", reduceFunction, (Class<Object>) pojoClass));
+			}
+		}
+	}
+
+	private static final class FirstValueReducer<T> implements ReduceFunction<T> {
+
+		private static final long serialVersionUID = -9222976423336835926L;
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+			return value1;
+		}
+	}
+
+	private static final class IdentityKeySelector<T> implements KeySelector<T, T> {
+
+		private static final long serialVersionUID = -3263628393881929147L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+}


[13/15] flink git commit: [FLINK-6804] [state] Consistent state migration behaviour across state backends

Posted by tz...@apache.org.
[FLINK-6804] [state] Consistent state migration behaviour across state backends

Prior to this commit, memory and non-memory state backends behaved
differently w.r.t. state migration. For the memory backends, we did
not require the new serializer to be compatible in order for the job to
proceed after restore, because all state have already been deserialized
to objects and the new serializer can always just be used as is.
Therefore, the compatibility checks were not performed for the memory
backends, resulting in different code paths between the different state
backends.

However, this inconsistent behaviour across backends will be confusing
for users. This commit adds the code path to check the newly registered
serializer's compatibility in the memory backends (even though it isn't
required), and deliberately fails the job if the new serializer is
incompatible.

Note that the compatibiilty code paths will be truly unified and
required for all backends once we have eager state registration.

This closes #4073.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0f2e99b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0f2e99b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0f2e99b

Branch: refs/heads/master
Commit: f0f2e99b6c829c4f4e2ca47c7647a64fe0c9d808
Parents: ae285f9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 22:40:26 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:35 2017 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/CompatibilityUtil.java |   8 +-
 .../state/DefaultOperatorStateBackend.java      | 119 +++++++++++++++----
 .../state/heap/HeapKeyedStateBackend.java       |  54 ++++++++-
 3 files changed, 151 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index 94bb9bd..df7f216 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -66,13 +66,11 @@ public class CompatibilityUtil {
 			} else {
 				if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
 					// if the preceding serializer exists and is not a dummy, use
-					// that for converting instead of the provided convert deserializer
+					// that for converting instead of any provided convert deserializer
 					return CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer);
-				} else if (initialResult.getConvertDeserializer() != null) {
-					return initialResult;
 				} else {
-					throw new RuntimeException(
-						"State migration required, but there is no available serializer capable of reading previous data.");
+					// requires migration (may or may not have a convert deserializer)
+					return initialResult;
 				}
 			}
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 0f96dac..b16ac06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -36,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +95,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	 */
 	private final boolean asynchronousSnapshots;
 
+	/**
+	 * Map of state names to their corresponding restored state meta info.
+	 *
+	 * <p>TODO this map can be removed when eager-state registration is in place.
+	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
+	 */
+	private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos;
+
+	/**
+	 * Cache of already accessed states.
+	 *
+	 * <p>In contrast to {@link #registeredStates} and {@link #restoredStateMetaInfos} which may be repopulated
+	 * with restored state, this map is always empty at the beginning.
+	 *
+	 * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a>
+	 */
+	private final HashMap<String, PartitionableListState<?>> accessedStatesByName;
+
 	public DefaultOperatorStateBackend(
 		ClassLoader userClassLoader,
 		ExecutionConfig executionConfig,
@@ -103,6 +126,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		this.javaSerializer = new JavaSerializer<>();
 		this.registeredStates = new HashMap<>();
 		this.asynchronousSnapshots = asynchronousSnapshots;
+		this.accessedStatesByName = new HashMap<>();
+		this.restoredStateMetaInfos = new HashMap<>();
 	}
 
 	public ExecutionConfig getExecutionConfig() {
@@ -314,6 +339,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 							" not be loaded. This is a temporary restriction that will be fixed in future versions.");
 					}
 
+					restoredStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+
 					PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName());
 
 					if (null == listState) {
@@ -359,7 +386,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		/**
 		 * Meta information of the state, including state name, assignment mode, and serializer
 		 */
-		private final RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
+		private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
 
 		/**
 		 * The internal list the holds the elements of the state
@@ -389,12 +416,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
 		}
 
-		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
-			return stateMetaInfo;
+		public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+			this.stateMetaInfo = stateMetaInfo;
 		}
 
-		public List<S> getInternalList() {
-			return internalList;
+		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+			return stateMetaInfo;
 		}
 
 		public PartitionableListState<S> deepCopy() {
@@ -441,19 +468,32 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	}
 
 	private <S> ListState<S> getListState(
-		ListStateDescriptor<S> stateDescriptor,
-		OperatorStateHandle.Mode mode) throws IOException {
+			ListStateDescriptor<S> stateDescriptor,
+			OperatorStateHandle.Mode mode) throws IOException, StateMigrationException {
+
 		Preconditions.checkNotNull(stateDescriptor);
+		String name = Preconditions.checkNotNull(stateDescriptor.getName());
 
-		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+		@SuppressWarnings("unchecked")
+		PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);
+		if (previous != null) {
+			checkStateNameAndMode(previous.getStateMetaInfo(), name, mode);
+			return previous;
+		}
 
-		String name = Preconditions.checkNotNull(stateDescriptor.getName());
+		// end up here if its the first time access after execution for the
+		// provided state name; check compatibility of restored state, if any
+		// TODO with eager registration in place, these checks should be moved to restore()
+
+		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
 		TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 
 		@SuppressWarnings("unchecked")
 		PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
 
 		if (null == partitionableListState) {
+			// no restored state for the state name; simply create new state holder
+
 			partitionableListState = new PartitionableListState<>(
 				new RegisteredOperatorBackendStateMetaInfo<>(
 					name,
@@ -462,21 +502,38 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 			registeredStates.put(name, partitionableListState);
 		} else {
-			// TODO with eager registration in place, these checks should be moved to restore()
-
-			Preconditions.checkState(
-				partitionableListState.getStateMetaInfo().getName().equals(name),
-				"Incompatible state names. " +
-					"Was [" + partitionableListState.getStateMetaInfo().getName() + "], " +
-					"registered with [" + name + "].");
-
-			Preconditions.checkState(
-				partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode),
-				"Incompatible state assignment modes. " +
-					"Was [" + partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " +
-					"registered with [" + mode + "].");
+			// has restored state; check compatibility of new state access
+
+			checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode);
+
+			@SuppressWarnings("unchecked")
+			RegisteredOperatorBackendStateMetaInfo.Snapshot<S> restoredMetaInfo =
+				(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredStateMetaInfos.get(name);
+
+			// check compatibility to determine if state migration is required
+			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getPartitionStateSerializer(),
+					UnloadableDummyTypeSerializer.class,
+					restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(),
+					partitionStateSerializer);
+
+			if (!stateCompatibility.isRequiresMigration()) {
+				// new serializer is compatible; use it to replace the old serializer
+				partitionableListState.setStateMetaInfo(
+					new RegisteredOperatorBackendStateMetaInfo<>(name, partitionStateSerializer, mode));
+			} else {
+				// TODO state migration currently isn't possible.
+
+				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
+				// since the state has already been deserialized to objects and we can just continue with
+				// the new serializer; we're deliberately failing here for now to have equal functionality with
+				// the RocksDB backend to avoid confusion for users.
+
+				throw new StateMigrationException("State migration isn't supported, yet.");
+			}
 		}
 
+		accessedStatesByName.put(name, partitionableListState);
 		return partitionableListState;
 	}
 
@@ -497,4 +554,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			}
 		}
 	}
+
+	private static void checkStateNameAndMode(
+			RegisteredOperatorBackendStateMetaInfo previousMetaInfo,
+			String expectedName,
+			OperatorStateHandle.Mode expectedMode) {
+
+		Preconditions.checkState(
+			previousMetaInfo.getName().equals(expectedName),
+			"Incompatible state names. " +
+				"Was [" + previousMetaInfo.getName() + "], " +
+				"registered with [" + expectedName + "].");
+
+		Preconditions.checkState(
+			previousMetaInfo.getAssignmentMode().equals(expectedMode),
+			"Incompatible state assignment modes. " +
+				"Was [" + previousMetaInfo.getAssignmentMode() + "], " +
+				"registered with [" + expectedMode + "].");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2ab9691..35a70bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -36,6 +37,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
@@ -98,6 +100,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
 
 	/**
+	 * Map of state names to their corresponding restored state meta info.
+	 *
+	 * <p>
+	 * TODO this map can be removed when eager-state registration is in place.
+	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
+	 */
+	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
+
+	/**
 	 * Determines whether or not we run snapshots asynchronously. This impacts the choice of the underlying
 	 * {@link StateTable} implementation.
 	 */
@@ -115,6 +126,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
 		this.asynchronousSnapshots = asynchronousSnapshots;
 		LOG.info("Initializing heap keyed state backend with stream factory.");
+
+		this.restoredKvStateMetaInfos = new HashMap<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -122,7 +135,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	// ------------------------------------------------------------------------
 
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
-			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
 
 		return tryRegisterStateTable(
 				stateDesc.getName(), stateDesc.getType(),
@@ -133,7 +146,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			String stateName,
 			StateDescriptor.Type stateType,
 			TypeSerializer<N> namespaceSerializer,
-			TypeSerializer<V> valueSerializer) {
+			TypeSerializer<V> valueSerializer) throws StateMigrationException {
 
 		final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
 				new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
@@ -163,7 +176,36 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						"registered with [" + newMetaInfo.getStateType() + "].");
 			}
 
-			stateTable.setMetaInfo(newMetaInfo);
+			@SuppressWarnings("unchecked")
+			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo =
+				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName);
+
+			// check compatibility results to determine if state migration is required
+			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getNamespaceSerializer(),
+					MigrationNamespaceSerializerProxy.class,
+					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+					newMetaInfo.getNamespaceSerializer());
+
+			CompatibilityResult<V> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+					restoredMetaInfo.getStateSerializer(),
+					UnloadableDummyTypeSerializer.class,
+					restoredMetaInfo.getStateSerializerConfigSnapshot(),
+					newMetaInfo.getStateSerializer());
+
+			if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
+				// new serializers are compatible; use them to replace the old serializers
+				stateTable.setMetaInfo(newMetaInfo);
+			} else {
+				// TODO state migration currently isn't possible.
+
+				// NOTE: for heap backends, it is actually fine to proceed here without failing the restore,
+				// since the state has already been deserialized to objects and we can just continue with
+				// the new serializer; we're deliberately failing here for now to have equal functionality with
+				// the RocksDB backend to avoid confusion for users.
+
+				throw new StateMigrationException("State migration isn't supported, yet.");
+			}
 		}
 
 		return stateTable;
@@ -427,6 +469,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							" in future versions.");
 					}
 
+					restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+
 					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
 
 					//important: only create a new table we did not already create it previously
@@ -528,6 +572,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot;
 				final StateTable rawResultMap =
 						stateSnapshot.deserialize(stateName, this);
+
+				// mimic a restored kv state meta info
+				restoredKvStateMetaInfos.put(stateName, rawResultMap.getMetaInfo().snapshot());
+
 				// add named state to the backend
 				stateTables.put(stateName, rawResultMap);
 			} else {


[05/15] flink git commit: [FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

Posted by tz...@apache.org.
[FLINK-6883] [core] Refactor TypeSerializer to not implement TypeDeserializer

The separation of the TypeDeserializer interface from the TypeSerializer
base class is due to the fact that additionally implementing the
TypeDeserializer interface alters the generation order of anonymos
serializer classes for Scala case classes and collections.

Instead, the TypeDeserializer is now used as a mixin on the
TypeDeserializerAdapter utility, which now serves as a bridge for
both directions (i.e. TypeSerializer to TypeDeserializer, and vice
versa). No user interfaces are broken due to this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69fada3d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69fada3d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69fada3d

Branch: refs/heads/master
Commit: 69fada3d0b4c686f29c356f00eb49039f416879f
Parents: 8d0c4c0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 11 15:30:36 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:37:46 2017 +0200

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   | 24 +++++++++++-
 .../typeutils/TypeDeserializerAdapter.java      | 40 ++++++++++++++------
 .../api/common/typeutils/TypeSerializer.java    |  2 +-
 3 files changed, 51 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 4c83ded..1e05d57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -60,6 +60,8 @@ public final class CompatibilityResult<T> {
 	 * @param convertDeserializer the convert deserializer to use, in the case that the preceding serializer
 	 *                            cannot be found.
 	 *
+	 * @param <T> the type of the data being migrated.
+	 *
 	 * @return a result that signals migration is necessary, also providing a convert deserializer.
 	 */
 	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) {
@@ -69,11 +71,29 @@ public final class CompatibilityResult<T> {
 	}
 
 	/**
+	 * Returns a result that signals migration to be performed, and in the case that the preceding serializer
+	 * cannot be found or restored to read the previous data during migration, a provided convert serializer
+	 * can be used. The provided serializer will only be used for deserialization.
+	 *
+	 * @param convertSerializer the convert serializer to use, in the case that the preceding serializer
+	 *                          cannot be found. The provided serializer will only be used for deserialization.
+	 *
+	 * @param <T> the type of the data being migrated.
+	 *
+	 * @return a result that signals migration is necessary, also providing a convert serializer.
+	 */
+	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeSerializer<T> convertSerializer) {
+		Preconditions.checkNotNull(convertSerializer, "Convert serializer cannot be null.");
+
+		return new CompatibilityResult<>(true, new TypeDeserializerAdapter<>(convertSerializer));
+	}
+
+	/**
 	 * Returns a result that signals migration to be performed. The migration will fail if the preceding
 	 * serializer for the previous data cannot be found.
 	 *
-	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)},
-	 * which will be used as a fallback resort in such cases.
+	 * <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)}
+	 * or {@link #requiresMigration(TypeSerializer)}, which will be used as a fallback resort in such cases.
 	 *
 	 * @return a result that signals migration is necessary, without providing a convert deserializer.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
index e02bed4..fb59602 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -25,27 +26,42 @@ import org.apache.flink.util.Preconditions;
 import java.io.IOException;
 
 /**
- * A utility class that wraps a {@link TypeDeserializer} as a {@link TypeSerializer}.
+ * A utility class that is used to bridge a {@link TypeSerializer} and {@link TypeDeserializer}.
+ * It either wraps a type deserializer or serializer, and can only ever be used for deserialization
+ * (i.e. only read-related methods is functional).
  *
- * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer,
+ * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer or serializer,
  * while serialization methods are masked and not intended for use.
  *
  * @param <T> The data type that the deserializer deserializes.
  */
-public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+@Internal
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> implements TypeDeserializer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	/** The actual wrapped deserializer instance */
+	/** The actual wrapped deserializer or serializer instance */
 	private final TypeDeserializer<T> deserializer;
+	private final TypeSerializer<T> serializer;
 
 	/**
-	 * Creates a {@link TypeSerializer} that wraps a {@link TypeDeserializer}.
+	 * Creates a {@link TypeDeserializerAdapter} that wraps a {@link TypeDeserializer}.
 	 *
 	 * @param deserializer the actual deserializer to wrap.
 	 */
 	public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
 		this.deserializer = Preconditions.checkNotNull(deserializer);
+		this.serializer = null;
+	}
+
+	/**
+	 * Creates a {@link TypeDeserializerAdapter} that wraps a {@link TypeSerializer}.
+	 *
+	 * @param serializer the actual serializer to wrap.
+	 */
+	public TypeDeserializerAdapter(TypeSerializer<T> serializer) {
+		this.deserializer = null;
+		this.serializer = Preconditions.checkNotNull(serializer);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -53,31 +69,31 @@ public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
 	// --------------------------------------------------------------------------------------------
 
 	public T deserialize(DataInputView source) throws IOException {
-		return deserializer.deserialize(source);
+		return (deserializer != null) ? deserializer.deserialize(source) : serializer.deserialize(source);
 	}
 
 	public T deserialize(T reuse, DataInputView source) throws IOException {
-		return deserializer.deserialize(reuse, source);
+		return (deserializer != null) ? deserializer.deserialize(reuse, source) : serializer.deserialize(reuse, source);
 	}
 
 	public TypeSerializer<T> duplicate() {
-		return deserializer.duplicate();
+		return (deserializer != null) ? deserializer.duplicate() : serializer.duplicate();
 	}
 
 	public int getLength() {
-		return deserializer.getLength();
+		return (deserializer != null) ? deserializer.getLength() : serializer.getLength();
 	}
 
 	public boolean equals(Object obj) {
-		return deserializer.equals(obj);
+		return (deserializer != null) ? deserializer.equals(obj) : serializer.equals(obj);
 	}
 
 	public boolean canEqual(Object obj) {
-		return deserializer.canEqual(obj);
+		return (deserializer != null) ? deserializer.canEqual(obj) : serializer.canEqual(obj);
 	}
 
 	public int hashCode() {
-		return deserializer.hashCode();
+		return (deserializer != null) ? deserializer.hashCode() : serializer.hashCode();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/69fada3d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 85cbfdb..a606a18 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializable {
+public abstract class TypeSerializer<T> implements Serializable {
 	
 	private static final long serialVersionUID = 1L;
 


[09/15] flink git commit: [FLINK-6803] [tests] Enhancements to PojoSerializerUpgradeTest

Posted by tz...@apache.org.
[FLINK-6803] [tests] Enhancements to PojoSerializerUpgradeTest

1. Allow tests to ignore missing fields.
2. Add equivalent tests which use POJOs as managed operator state.

For 2, all tests have to be ignored for now until FLINK-6804 is fixed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c157d62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c157d62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c157d62

Branch: refs/heads/master
Commit: 7c157d624e38513055662491b3b13b4ceb7d3001
Parents: 61a45e4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 19:32:53 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../PojoSerializerUpgradeTest.java              | 210 +++++++++++++++----
 1 file changed, 164 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c157d62/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 2769c50..a925d43 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
@@ -169,21 +171,30 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		"@Override public String toString() {return \"(\" + a + \")\";}}";
 
 	/**
-	 * We should be able to handle a changed field order
+	 * We should be able to handle a changed field order of a POJO as keyed state
 	 */
 	@Test
-	public void testChangedFieldOrder() throws Exception {
-		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B);
+	public void testChangedFieldOrderWithKeyedState() throws Exception {
+		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, true);
 	}
 
 	/**
-	 * Changing field types should require a state migration
+	 * We should be able to handle a changed field order of a POJO as operator state
 	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
-	public void testChangedFieldTypes() throws Exception {
+	public void testChangedFieldOrderWithOperatorState() throws Exception {
+		testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, false);
+	}
+
+	/**
+	 * Changing field types of a POJO as keyed state should require a state migration
+	 */
+	@Test
+	public void testChangedFieldTypesWithKeyedState() throws Exception {
 		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
 		try {
-			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C);
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, true);
 			fail("Expected a state migration exception.");
 		} catch (Exception e) {
 			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
@@ -195,13 +206,31 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Adding fields should require a state migration
+	 * Changing field types of a POJO as operator state should require a state migration
 	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
 	@Test
-	public void testAdditionalField() throws Exception {
+	public void testChangedFieldTypesWithOperatorState() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, false);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Adding fields to a POJO as keyed state should require a state migration
+	 */
+	@Test
+	public void testAdditionalFieldWithKeyedState() throws Exception {
 		assumeTrue("Running only for RocksDBStateBackend until FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
 		try {
-			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D);
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
 			fail("Expected a state migration exception.");
 		} catch (Exception e) {
 			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
@@ -213,13 +242,49 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Removing fields should require a state migration
+	 * Adding fields to a POJO as operator state should require a state migration
+	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
+	@Test
+	public void testAdditionalFieldWithOperatorState() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Removing fields from a POJO as keyed state should require a state migration
 	 */
 	@Ignore("Ignore this test until FLINK-6801 has been fixed.")
 	@Test
-	public void testMissingField() throws Exception {
+	public void testMissingFieldWithKeyedState() throws Exception {
+		try {
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true);
+			fail("Expected a state migration exception.");
+		} catch (Exception e) {
+			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
+				// StateMigrationException expected
+			} else {
+				throw e;
+			}
+		}
+	}
+
+	/**
+	 * Removing fields from a POJO as operator state should require a state migration
+	 */
+	@Ignore("Ignore this test until FLINK-6804 has been fixed.")
+	@Test
+	public void testMissingFieldWithOperatorState() throws Exception {
 		try {
-			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E);
+			testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false);
 			fail("Expected a state migration exception.");
 		} catch (Exception e) {
 			if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
@@ -230,7 +295,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		}
 	}
 
-	public void testPojoSerializerUpgrade(String classSourceA, String classSourceB) throws Exception {
+	public void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception {
 		final Configuration taskConfiguration = new Configuration();
 		final ExecutionConfig executionConfig = new ExecutionConfig();
 		final KeySelector<Long, Long> keySelector = new IdentityKeySelector<>();
@@ -248,8 +313,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		OperatorStateHandles stateHandles = runOperator(
 			taskConfiguration,
 			executionConfig,
-			new StreamMap<>(new StatefulMapper(true, false)),
+			new StreamMap<>(new StatefulMapper(isKeyedState, false, hasBField)),
 			keySelector,
+			isKeyedState,
 			stateBackend,
 			classLoader,
 			null,
@@ -268,8 +334,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		runOperator(
 			taskConfiguration,
 			executionConfig,
-			new StreamMap<>(new StatefulMapper(true, true)),
+			new StreamMap<>(new StatefulMapper(isKeyedState, true, hasBField)),
 			keySelector,
+			isKeyedState,
 			stateBackend,
 			classLoaderB,
 			stateHandles,
@@ -281,6 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			ExecutionConfig executionConfig,
 			OneInputStreamOperator<Long, Long> operator,
 			KeySelector<Long, Long> keySelector,
+			boolean isKeyedState,
 			StateBackend stateBackend,
 			ClassLoader classLoader,
 			OperatorStateHandles operatorStateHandles,
@@ -298,11 +366,17 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			0,
 			classLoader);
 
-		final KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> harness = new KeyedOneInputStreamOperatorTestHarness<>(
-			operator,
-			keySelector,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			environment);
+		OneInputStreamOperatorTestHarness<Long, Long> harness;
+
+		if (isKeyedState) {
+			harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				operator,
+				keySelector,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				environment);
+		} else {
+			harness = new OneInputStreamOperatorTestHarness<>(operator, LongSerializer.INSTANCE, environment);
+		}
 
 		harness.setStateBackend(stateBackend);
 
@@ -350,18 +424,26 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 
 		private final boolean keyed;
 		private final boolean verify;
+		private final boolean hasBField;
+
+		// keyed states
+		private transient ValueState<Object> keyedValueState;
+		private transient MapState<Object, Object> keyedMapState;
+		private transient ListState<Object> keyedListState;
+		private transient ReducingState<Object> keyedReducingState;
+
+		// operator states
+		private transient ListState<Object> partitionableListState;
+		private transient ListState<Object> unionListState;
 
-		private transient ValueState<Object> valueState;
-		private transient MapState<Object, Object> mapState;
-		private transient ListState<Object> listState;
-		private transient ReducingState<Object> reducingState;
 		private transient Class<?> pojoClass;
 		private transient Field fieldA;
 		private transient Field fieldB;
 
-		public StatefulMapper(boolean keyed, boolean verify) {
+		public StatefulMapper(boolean keyed, boolean verify, boolean hasBField) {
 			this.keyed = keyed;
 			this.verify = verify;
+			this.hasBField = hasBField;
 		}
 
 		@Override
@@ -369,30 +451,54 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			Object pojo = pojoClass.newInstance();
 
 			fieldA.set(pojo, value);
-			fieldB.set(pojo, value + "");
+
+			if (hasBField) {
+				fieldB.set(pojo, value + "");
+			}
 
 			if (verify) {
-				assertEquals(pojo, valueState.value());
+				if (keyed) {
+					assertEquals(pojo, keyedValueState.value());
 
-				assertTrue(mapState.contains(pojo));
-				assertEquals(pojo, mapState.get(pojo));
+					assertTrue(keyedMapState.contains(pojo));
+					assertEquals(pojo, keyedMapState.get(pojo));
 
-				Iterator<Object> listIterator = listState.get().iterator();
+					Iterator<Object> listIterator = keyedListState.get().iterator();
 
-				boolean elementFound = false;
+					boolean elementFound = false;
 
-				while(listIterator.hasNext()) {
-					elementFound |= pojo.equals(listIterator.next());
-				}
+					while (listIterator.hasNext()) {
+						elementFound |= pojo.equals(listIterator.next());
+					}
+
+					assertTrue(elementFound);
 
-				assertTrue(elementFound);
+					assertEquals(pojo, keyedReducingState.get());
+				} else {
+					boolean elementFound = false;
+					Iterator<Object> listIterator = partitionableListState.get().iterator();
+					while (listIterator.hasNext()) {
+						elementFound |= pojo.equals(listIterator.next());
+					}
+					assertTrue(elementFound);
 
-				assertEquals(pojo, reducingState.get());
+					elementFound = false;
+					listIterator = unionListState.get().iterator();
+					while (listIterator.hasNext()) {
+						elementFound |= pojo.equals(listIterator.next());
+					}
+					assertTrue(elementFound);
+				}
 			} else {
-				valueState.update(pojo);
-				mapState.put(pojo, pojo);
-				listState.add(pojo);
-				reducingState.add(pojo);
+				if (keyed) {
+					keyedValueState.update(pojo);
+					keyedMapState.put(pojo, pojo);
+					keyedListState.add(pojo);
+					keyedReducingState.add(pojo);
+				} else {
+					partitionableListState.add(pojo);
+					unionListState.add(pojo);
+				}
 			}
 
 			return value;
@@ -408,17 +514,29 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
 
 			fieldA = pojoClass.getDeclaredField("a");
-			fieldB = pojoClass.getDeclaredField("b");
 			fieldA.setAccessible(true);
-			fieldB.setAccessible(true);
+
+			if (hasBField) {
+				fieldB = pojoClass.getDeclaredField("b");
+				fieldB.setAccessible(true);
+			}
 
 			if (keyed) {
-				valueState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("valueState", (Class<Object>) pojoClass));
-				mapState = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>("mapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
-				listState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("listState", (Class<Object>) pojoClass));
+				keyedValueState = context.getKeyedStateStore().getState(
+					new ValueStateDescriptor<>("keyedValueState", (Class<Object>) pojoClass));
+				keyedMapState = context.getKeyedStateStore().getMapState(
+					new MapStateDescriptor<>("keyedMapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
+				keyedListState = context.getKeyedStateStore().getListState(
+					new ListStateDescriptor<>("keyedListState", (Class<Object>) pojoClass));
 
 				ReduceFunction<Object> reduceFunction = new FirstValueReducer<>();
-				reducingState = context.getKeyedStateStore().getReducingState(new ReducingStateDescriptor<>("reducingState", reduceFunction, (Class<Object>) pojoClass));
+				keyedReducingState = context.getKeyedStateStore().getReducingState(
+					new ReducingStateDescriptor<>("keyedReducingState", reduceFunction, (Class<Object>) pojoClass));
+			} else {
+				partitionableListState = context.getOperatorStateStore().getListState(
+					new ListStateDescriptor<>("partitionableListState", (Class<Object>) pojoClass));
+				unionListState = context.getOperatorStateStore().getUnionListState(
+					new ListStateDescriptor<>("unionListState", (Class<Object>) pojoClass));
 			}
 		}
 	}


[07/15] flink git commit: [FLINK-6796] [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness

Posted by tz...@apache.org.
[FLINK-6796] [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness

Generalize KeyedOneInputStreamOperatorTestHarness

Generalize AbstractStreamOperatorTestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e35c575a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e35c575a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e35c575a

Branch: refs/heads/master
Commit: e35c575ac41f2bdb3855170be883d0e21aa0379e
Parents: 7aad0ec
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 31 15:14:11 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:16 2017 +0200

----------------------------------------------------------------------
 .../operators/testutils/MockEnvironment.java    | 43 +++++++++++++++++++-
 .../util/AbstractStreamOperatorTestHarness.java | 18 ++++----
 .../KeyedOneInputStreamOperatorTestHarness.java | 16 ++++++++
 .../util/OneInputStreamOperatorTestHarness.java |  7 +---
 4 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 49175c7..4f0242e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -97,12 +98,23 @@ public class MockEnvironment implements Environment {
 
 	private final int bufferSize;
 
+	private final ClassLoader userCodeClassLoader;
+
 	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration(), new ExecutionConfig());
 	}
 
 	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration, ExecutionConfig executionConfig) {
-		this(taskName, memorySize, inputSplitProvider, bufferSize, taskConfiguration, executionConfig, 1, 1, 0);
+		this(
+			taskName,
+			memorySize,
+			inputSplitProvider,
+			bufferSize,
+			taskConfiguration,
+			executionConfig,
+			1,
+			1,
+			0);
 	}
 
 	public MockEnvironment(
@@ -115,6 +127,31 @@ public class MockEnvironment implements Environment {
 			int maxParallelism,
 			int parallelism,
 			int subtaskIndex) {
+		this(
+			taskName,
+			memorySize,
+			inputSplitProvider,
+			bufferSize,
+			taskConfiguration,
+			executionConfig,
+			maxParallelism,
+			parallelism,
+			subtaskIndex,
+			Thread.currentThread().getContextClassLoader());
+
+	}
+
+	public MockEnvironment(
+			String taskName,
+			long memorySize,
+			MockInputSplitProvider inputSplitProvider,
+			int bufferSize,
+			Configuration taskConfiguration,
+			ExecutionConfig executionConfig,
+			int maxParallelism,
+			int parallelism,
+			int subtaskIndex,
+			ClassLoader userCodeClassLoader) {
 		this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0);
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = taskConfiguration;
@@ -131,6 +168,8 @@ public class MockEnvironment implements Environment {
 
 		KvStateRegistry registry = new KvStateRegistry();
 		this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId());
+
+		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 	}
 
 
@@ -254,7 +293,7 @@ public class MockEnvironment implements Environment {
 
 	@Override
 	public ClassLoader getUserClassLoader() {
-		return getClass().getClassLoader();
+		return userCodeClassLoader;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 0a517f0..6f2d349 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -109,7 +110,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	CloseableRegistry closableRegistry;
 
 	// use this as default for tests
-	protected AbstractStateBackend stateBackend = new MemoryStateBackend();
+	protected StateBackend stateBackend = new MemoryStateBackend();
 
 	private final Object checkpointLock;
 
@@ -132,9 +133,6 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 		this(
 			operator,
-			maxParallelism,
-			numSubtasks,
-			subtaskIndex,
 			new MockEnvironment(
 				"MockTask",
 				3 * 1024 * 1024,
@@ -149,9 +147,6 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
-			int maxParallelism,
-			int numSubtasks,
-			int subtaskIndex,
 			final Environment environment) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
@@ -192,7 +187,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(environment);
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-		when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
+		when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader());
 		when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
 		when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
 
@@ -226,8 +221,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 					OperatorStateBackend osb;
 
 					osb = stateBackend.createOperatorStateBackend(
-							environment,
-							operator.getClass().getSimpleName());
+						environment,
+						operator.getClass().getSimpleName());
 
 					mockTask.getCancelables().registerClosable(osb);
 
@@ -248,9 +243,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 				return processingTimeService;
 			}
 		}).when(mockTask).getProcessingTimeService();
+
 	}
 
-	public void setStateBackend(AbstractStateBackend stateBackend) {
+	public void setStateBackend(StateBackend stateBackend) {
 		this.stateBackend = stateBackend;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 8f4908a..0d42d9f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -92,6 +93,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		this(operator, keySelector, keyType, 1, 1, 0);
 	}
 
+	public KeyedOneInputStreamOperatorTestHarness(
+			final OneInputStreamOperator<IN, OUT> operator,
+			final  KeySelector<IN, K> keySelector,
+			final TypeInformation<K> keyType,
+			final Environment environment) throws Exception {
+
+		super(operator, environment);
+
+		ClosureCleaner.clean(keySelector, false);
+		config.setStatePartitioner(0, keySelector);
+		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+
+		setupMockTaskCreateKeyedBackend();
+	}
+
 	private void setupMockTaskCreateKeyedBackend() {
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 652d016..8a0996f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -53,7 +53,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		OneInputStreamOperator<IN, OUT> operator,
 		TypeSerializer<IN> typeSerializerIn,
 		Environment environment) throws Exception {
-		this(operator, 1, 1, 0, environment);
+		this(operator, environment);
 
 		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
 	}
@@ -74,11 +74,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 
 	public OneInputStreamOperatorTestHarness(
 		OneInputStreamOperator<IN, OUT> operator,
-		int maxParallelism,
-		int numTubtasks,
-		int subtaskIndex,
 		Environment environment) throws Exception {
-		super(operator, maxParallelism, numTubtasks, subtaskIndex, environment);
+		super(operator, environment);
 
 		this.oneInputOperator = operator;
 	}


[04/15] flink git commit: [hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in CEP

Posted by tz...@apache.org.
[hotfix] [cep] Fix incorrect CompatibilityResult.requiresMigration calls in CEP


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d0c4c04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d0c4c04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d0c4c04

Branch: refs/heads/master
Commit: 8d0c4c0405a24e087e66d3a4bf07d1105eda2fc9
Parents: b216a4a
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 11 11:02:38 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:37:45 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/typeutils/CompatibilityResult.java | 6 ++++--
 .../flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java  | 2 +-
 .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java   | 2 +-
 3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d0c4c04/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index 5ad0b5e..4c83ded 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.common.typeutils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 /**
  * A {@code CompatibilityResult} contains information about whether or not data migration
  * is required in order to continue using new serializers for previously serialized data.
@@ -60,10 +62,10 @@ public final class CompatibilityResult<T> {
 	 *
 	 * @return a result that signals migration is necessary, also providing a convert deserializer.
 	 */
-	public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
+	public static <T> CompatibilityResult<T> requiresMigration(@Nonnull TypeDeserializer<T> convertDeserializer) {
 		Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null.");
 
-		return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer));
+		return new CompatibilityResult<>(true, convertDeserializer);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8d0c4c04/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index cac1601..a6c5bde 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -1108,7 +1108,7 @@ public class NFA<T> implements Serializable {
 				}
 			}
 
-			return CompatibilityResult.requiresMigration(null);
+			return CompatibilityResult.requiresMigration();
 		}
 
 		private void serializeStates(Set<State<T>> states, DataOutputView out) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/8d0c4c04/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index d592c65..5947465 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -1163,7 +1163,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 				}
 			}
 
-			return CompatibilityResult.requiresMigration(null);
+			return CompatibilityResult.requiresMigration();
 		}
 	}
 


[12/15] flink git commit: [FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot

Posted by tz...@apache.org.
[FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot

Prior to this commit, when reading the PojoSerializerConfigSnapshot, if
the underlying POJO type has a missing field, then the read would fail.
Failing the deserialization of the config snapshot is too severe,
because that would leave no oppurtunity to restore the checkpoint at
all, whereas we should be able to restore the config and provide it to
the new PojoSerializer for the change of getting a convert deserializer.

This commit changes this by only restoring the field names when reading
the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility,
the field name is used to lookup the fields of the new PojoSerializer.
This change does not change the serialization format of the
PojoSerializerConfigSnapshot.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c929eb30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c929eb30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c929eb30

Branch: refs/heads/master
Commit: c929eb30867bb1f539c98fe9e47f91790bd85764
Parents: 7c157d6
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 12:30:58 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java  | 65 +++++++-------------
 .../typeutils/runtime/PojoSerializerTest.java   | 18 +++---
 2 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c929eb30/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 2311158..7818897 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -600,18 +600,18 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 						(TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields];
 
 					int i = 0;
-					for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
+					for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry
 							: config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 						int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey());
 						if (fieldIndex != -1) {
-							reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();
+							reorderedFields[i] = fields[fieldIndex];
 
 							compatResult = CompatibilityUtil.resolveCompatibilityResult(
-									fieldToConfigSnapshotEntry.getValue().f0,
-									UnloadableDummyTypeSerializer.class,
-									fieldToConfigSnapshotEntry.getValue().f1,
-									fieldSerializers[fieldIndex]);
+								fieldToConfigSnapshotEntry.getValue().f0,
+								UnloadableDummyTypeSerializer.class,
+								fieldToConfigSnapshotEntry.getValue().f1,
+								fieldSerializers[fieldIndex]);
 
 							if (compatResult.isRequiresMigration()) {
 								requiresMigration = true;
@@ -745,7 +745,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		 * may reorder the fields in case they are different. The order of the fields need to
 		 * stay the same for binary compatibility, as the field order is part of the serialization format.
 		 */
-		private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
+		private LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
 
 		/**
 		 * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots.
@@ -771,7 +771,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
 				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
 				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) {
 
@@ -785,7 +785,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 		public PojoSerializerConfigSnapshot(
 				Class<T> pojoType,
-				LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+				LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
 				LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots,
 				HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots,
 				boolean ignoreTypeSerializerSerialization) {
@@ -813,10 +813,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 				// --- write fields and their serializers, in order
 
 				out.writeInt(fieldToSerializerConfigSnapshot.size());
-				for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+				for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
 						: fieldToSerializerConfigSnapshot.entrySet()) {
 
-					outViewWrapper.writeUTF(entry.getKey().getName());
+					outViewWrapper.writeUTF(entry.getKey());
 
 					out.writeInt(outWithPos.getPosition());
 					if (!ignoreTypeSerializerSerialization) {
@@ -904,39 +904,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 				this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields);
 				String fieldName;
-				Field field;
 				TypeSerializer<?> fieldSerializer;
 				TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot;
 				for (int i = 0; i < numFields; i++) {
 					fieldName = inViewWrapper.readUTF();
 
-					// search all superclasses for the field
-					Class<?> clazz = getTypeClass();
-					field = null;
-					while (clazz != null) {
-						try {
-							field = clazz.getDeclaredField(fieldName);
-							field.setAccessible(true);
-							break;
-						} catch (NoSuchFieldException e) {
-							clazz = clazz.getSuperclass();
-						}
-					}
+					inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+					fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
 
-					if (field == null) {
-						// the field no longer exists in the POJO
-						throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName());
-					} else {
-						inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
-						fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader());
-
-						inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
-						fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
+					inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+					fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader());
 
-						fieldToSerializerConfigSnapshot.put(
-							field,
-							new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
-					}
+					fieldToSerializerConfigSnapshot.put(
+						fieldName,
+						new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
 				}
 
 				// --- read registered subclasses and their serializers, in registration order
@@ -998,7 +979,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			return VERSION;
 		}
 
-		public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
+		public LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
 			return fieldToSerializerConfigSnapshot;
 		}
 
@@ -1144,10 +1125,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	 * Finds and returns the order (0-based) of a POJO field.
 	 * Returns -1 if the field does not exist for this POJO.
 	 */
-	private int findField(Field f) {
+	private int findField(String fieldName) {
 		int foundIndex = 0;
 		for (Field field : fields) {
-			if (f.equals(field)) {
+			if (fieldName.equals(field.getName())) {
 				return foundIndex;
 			}
 
@@ -1174,12 +1155,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			TypeSerializer<?>[] fieldSerializers,
 			HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) {
 
-		final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
+		final LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
 			new LinkedHashMap<>(fields.length);
 
 		for (int i = 0; i < fields.length; i++) {
 			fieldToSerializerConfigSnapshots.put(
-				fields[i],
+				fields[i].getName(),
 				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c929eb30/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 10f4708..e5315aa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -484,35 +484,35 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 		// creating this serializer just for generating config snapshots of the field serializers
 		PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
 			new LinkedHashMap<>(mockOriginalFieldOrder.length);
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[0],
+			mockOriginalFieldOrder[0].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[3],
 				ser.getFieldSerializers()[3].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[1],
+			mockOriginalFieldOrder[1].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[2],
 				ser.getFieldSerializers()[2].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[2],
+			mockOriginalFieldOrder[2].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[5],
 				ser.getFieldSerializers()[5].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[3],
+			mockOriginalFieldOrder[3].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[0],
 				ser.getFieldSerializers()[0].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[4],
+			mockOriginalFieldOrder[4].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[1],
 				ser.getFieldSerializers()[1].snapshotConfiguration()));
 		mockOriginalFieldToSerializerConfigSnapshot.put(
-			mockOriginalFieldOrder[5],
+			mockOriginalFieldOrder[5].getName(),
 			new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
 				ser.getFieldSerializers()[4],
 				ser.getFieldSerializers()[4].snapshotConfiguration()));
@@ -579,9 +579,9 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
 			PojoSerializer.PojoSerializerConfigSnapshot<?> original,
 			PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
 
-		LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+		LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
 				original.getFieldToSerializerConfigSnapshot();
-		for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
+		for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
 				: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
 			Assert.assertEquals(null, entry.getValue().f0);


[06/15] flink git commit: [FLINK-6883] [tests] Add migration tests for Scala jobs

Posted by tz...@apache.org.
[FLINK-6883] [tests] Add migration tests for Scala jobs

This commit adds migration ITCases for jobs written using the Scala API.
An extra concern for migration of Scala jobs is that Scala case classes
and collections use anonymous generated serializers, which may affect
state restore.

This closes #4103.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aad0ecd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aad0ecd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aad0ecd

Branch: refs/heads/master
Commit: 7aad0ecd80f3c21fd21991e4c29d42a7802d95b9
Parents: 69fada3d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 11 15:31:42 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:01 2017 +0200

----------------------------------------------------------------------
 .../api/scala/typeutils/OptionTypeInfo.scala    |   5 +-
 ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 ...gration-itcase-flink1.2-jobmanager-savepoint | Bin 0 -> 92818 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 92818 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../_metadata                                   | Bin 0 -> 213855 bytes
 .../scala/migration/MigrationTestTypes.scala    |  28 ++
 .../ScalaSerializersMigrationTest.scala         | 110 +++++++
 .../StatefulJobSavepointMigrationITCase.scala   | 303 +++++++++++++++++++
 12 files changed, 445 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 73fe580..d2e66a5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.flink.annotation.{Public, PublicEvolving}
+import org.apache.flink.annotation.{Public, PublicEvolving, VisibleForTesting}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -85,4 +85,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio
   override def hashCode: Int = {
     elemTypeInfo.hashCode()
   }
+
+  @VisibleForTesting
+  def getElemTypeInfo: TypeInformation[A] = elemTypeInfo
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint
new file mode 100644
index 0000000..3d0f8c5
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-jobmanager-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..5a763df
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.2-rocksdb-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
new file mode 100644
index 0000000..e183e51
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..612bc1b
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint
new file mode 100644
index 0000000..9b90ac8
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-jobmanager-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint
new file mode 100644
index 0000000..99777a1
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.2-rocksdb-savepoint differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
new file mode 100644
index 0000000..6adf433
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..d9eaa72
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
new file mode 100644
index 0000000..4ae57c4
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/MigrationTestTypes.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+case class CustomCaseClass(a: String, b: Long)
+
+case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)
+
+object CustomEnum extends Enumeration {
+  type CustomEnum = Value
+  val ONE, TWO, THREE, FOUR = Value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
new file mode 100644
index 0000000..a2edc90
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/ScalaSerializersMigrationTest.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils._
+import org.junit.{Assert, Test}
+
+import scala.util.Try
+
+class ScalaSerializersMigrationTest {
+
+  /**
+   * Verifies that the generated classnames for anonymous Scala serializers remain the same.
+   *
+   * The classnames in this test are collected from running the same type information generation
+   * code in previous version branches. They should not change across different Flink versions.
+   */
+  @Test
+  def testStableAnonymousClassnameGeneration(): Unit = {
+    val caseClassInfo = createTypeInformation[CustomCaseClass]
+    val caseClassWithNestingInfo =
+      createTypeInformation[CustomCaseClassWithNesting]
+        .asInstanceOf[CaseClassTypeInfo[_]]
+    val traversableInfo =
+      createTypeInformation[List[CustomCaseClass]]
+        .asInstanceOf[TraversableTypeInfo[_,_]]
+    val tryInfo =
+      createTypeInformation[Try[CustomCaseClass]]
+        .asInstanceOf[TryTypeInfo[_,_]]
+    val optionInfo =
+      createTypeInformation[Option[CustomCaseClass]]
+        .asInstanceOf[OptionTypeInfo[_,_]]
+    val eitherInfo =
+      createTypeInformation[Either[CustomCaseClass, String]]
+        .asInstanceOf[EitherTypeInfo[_,_,_]]
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8",
+      caseClassInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$8$$anon$1",
+      caseClassInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9",
+      caseClassWithNestingInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$3",
+      caseClassWithNestingInfo.createSerializer(new ExecutionConfig).getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10",
+      caseClassWithNestingInfo.getTypeAt("nested").getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$9$$anon$10$$anon$2",
+      caseClassWithNestingInfo.getTypeAt("nested")
+        .createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16",
+      traversableInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$16$$anon$12",
+      traversableInfo.createSerializer(new ExecutionConfig).getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11",
+      traversableInfo.elementTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$11$$anon$4",
+      traversableInfo.elementTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13",
+      tryInfo.elemTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$13$$anon$5",
+      tryInfo.elemTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14",
+      optionInfo.getElemTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$14$$anon$6",
+      optionInfo.getElemTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15",
+      eitherInfo.leftTypeInfo.getClass.getName)
+    Assert.assertEquals(
+      "org.apache.flink.api.scala.migration.ScalaSerializersMigrationTest$$anon$15$$anon$7",
+      eitherInfo.leftTypeInfo.createSerializer(new ExecutionConfig).getClass.getName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7aad0ecd/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
new file mode 100644
index 0000000..1e67042
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/migration/StatefulJobSavepointMigrationITCase.scala
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.migration
+
+import java.util
+
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.runtime.state.{AbstractStateBackend, FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.migration.CustomEnum.CustomEnum
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.streaming.util.migration.MigrationVersion
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assume, Ignore, Test}
+
+import scala.util.{Failure, Properties, Try}
+
+object StatefulJobSavepointMigrationITCase {
+
+  @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+  def parameters: util.Collection[(MigrationVersion, String)] = {
+    util.Arrays.asList(
+      (MigrationVersion.v1_2, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_2, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, AbstractStateBackend.MEMORY_STATE_BACKEND_NAME),
+      (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME))
+  }
+
+  // TODO to generate savepoints for a specific Flink version / backend type,
+  // TODO change these values accordingly, e.g. to generate for 1.3 with RocksDB,
+  // TODO set as (MigrationVersion.v1_3, AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME)
+  val GENERATE_SAVEPOINT_VER: MigrationVersion = MigrationVersion.v1_3
+  val GENERATE_SAVEPOINT_BACKEND_TYPE: String = AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME
+
+  val SCALA_VERSION: String = {
+    val versionString = Properties.versionString.split(" ")(1)
+    versionString.substring(0, versionString.lastIndexOf("."))
+  }
+
+  val NUM_ELEMENTS = 4
+}
+
+/**
+ * ITCase for migration Scala state types across different Flink versions.
+ */
+@RunWith(classOf[Parameterized])
+class StatefulJobSavepointMigrationITCase(
+    migrationVersionAndBackend: (MigrationVersion, String))
+  extends SavepointMigrationTestBase with Serializable {
+
+  @Ignore
+  @Test
+  def testCreateSavepoint(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE match {
+      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
+      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+        env.setStateBackend(new MemoryStateBackend())
+      case _ => throw new UnsupportedOperationException
+    }
+
+    env.setStateBackend(new MemoryStateBackend)
+    env.enableCheckpointing(500)
+    env.setParallelism(4)
+    env.setMaxParallelism(4)
+
+    env
+      .addSource(
+        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+      .keyBy(
+        new KeySelector[(Long, Long), Long] {
+          override def getKey(value: (Long, Long)): Long = value._1
+        }
+      )
+      .flatMap(new StatefulFlatMapper)
+      .addSink(new AccumulatorCountingSink)
+
+    executeAndSavepoint(
+      env,
+      s"src/test/resources/stateful-scala" +
+        s"${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+        s"-udf-migration-itcase-flink" +
+        s"${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_VER}" +
+        s"-${StatefulJobSavepointMigrationITCase.GENERATE_SAVEPOINT_BACKEND_TYPE}-savepoint",
+      new Tuple2(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS
+      )
+    )
+  }
+
+  @Test
+  def testRestoreSavepoint(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    migrationVersionAndBackend._2 match {
+      case AbstractStateBackend.ROCKSDB_STATE_BACKEND_NAME =>
+        env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()))
+      case AbstractStateBackend.MEMORY_STATE_BACKEND_NAME =>
+        env.setStateBackend(new MemoryStateBackend())
+      case _ => throw new UnsupportedOperationException
+    }
+
+    env.setStateBackend(new MemoryStateBackend)
+    env.enableCheckpointing(500)
+    env.setParallelism(4)
+    env.setMaxParallelism(4)
+
+    env
+      .addSource(
+        new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
+      .keyBy(
+        new KeySelector[(Long, Long), Long] {
+          override def getKey(value: (Long, Long)): Long = value._1
+        }
+      )
+      .flatMap(new StatefulFlatMapper)
+      .addSink(new AccumulatorCountingSink)
+
+    restoreAndExecute(
+      env,
+      SavepointMigrationTestBase.getResourceFilename(
+        s"stateful-scala${StatefulJobSavepointMigrationITCase.SCALA_VERSION}" +
+          s"-udf-migration-itcase-flink${migrationVersionAndBackend._1}" +
+          s"-${migrationVersionAndBackend._2}-savepoint"),
+      new Tuple2(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR,
+        StatefulJobSavepointMigrationITCase.NUM_ELEMENTS)
+    )
+  }
+
+  @SerialVersionUID(1L)
+  private object CheckpointedSource {
+    var CHECKPOINTED_STRING = "Here be dragons!"
+  }
+
+  @SerialVersionUID(1L)
+  private class CheckpointedSource(val numElements: Int)
+      extends SourceFunction[(Long, Long)] with CheckpointedFunction {
+
+    private var isRunning = true
+    private var state: ListState[CustomCaseClass] = _
+
+    @throws[Exception]
+    override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
+      ctx.emitWatermark(new Watermark(0))
+      ctx.getCheckpointLock synchronized {
+        var i = 0
+        while (i < numElements) {
+            ctx.collect(i, i)
+            i += 1
+        }
+      }
+      // don't emit a final watermark so that we don't trigger the registered event-time
+      // timers
+      while (isRunning) Thread.sleep(20)
+    }
+
+    def cancel() {
+      isRunning = false
+    }
+
+    override def initializeState(context: FunctionInitializationContext): Unit = {
+      state = context.getOperatorStateStore.getOperatorState(
+        new ListStateDescriptor[CustomCaseClass](
+          "sourceState", createTypeInformation[CustomCaseClass]))
+    }
+
+    override def snapshotState(context: FunctionSnapshotContext): Unit = {
+      state.clear()
+      state.add(CustomCaseClass("Here be dragons!", 123))
+    }
+  }
+
+  @SerialVersionUID(1L)
+  private object AccumulatorCountingSink {
+    var NUM_ELEMENTS_ACCUMULATOR = classOf[AccumulatorCountingSink[_]] + "_NUM_ELEMENTS"
+  }
+
+  @SerialVersionUID(1L)
+  private class AccumulatorCountingSink[T] extends RichSinkFunction[T] {
+
+    private var count: Int = 0
+
+    @throws[Exception]
+    override def open(parameters: Configuration) {
+      super.open(parameters)
+      getRuntimeContext.addAccumulator(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, new IntCounter)
+    }
+
+    @throws[Exception]
+    def invoke(value: T) {
+      count += 1
+      getRuntimeContext.getAccumulator(
+        AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR).add(1)
+    }
+  }
+
+  class StatefulFlatMapper extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
+
+    private var caseClassState: ValueState[CustomCaseClass] = _
+    private var caseClassWithNestingState: ValueState[CustomCaseClassWithNesting] = _
+    private var collectionState: ValueState[List[CustomCaseClass]] = _
+    private var tryState: ValueState[Try[CustomCaseClass]] = _
+    private var tryFailureState: ValueState[Try[CustomCaseClass]] = _
+    private var optionState: ValueState[Option[CustomCaseClass]] = _
+    private var optionNoneState: ValueState[Option[CustomCaseClass]] = _
+    private var eitherLeftState: ValueState[Either[CustomCaseClass, String]] = _
+    private var eitherRightState: ValueState[Either[CustomCaseClass, String]] = _
+    private var enumOneState: ValueState[CustomEnum] = _
+    private var enumThreeState: ValueState[CustomEnum] = _
+
+    override def open(parameters: Configuration): Unit = {
+      caseClassState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomCaseClass](
+          "caseClassState", createTypeInformation[CustomCaseClass]))
+      caseClassWithNestingState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomCaseClassWithNesting](
+          "caseClassWithNestingState", createTypeInformation[CustomCaseClassWithNesting]))
+      collectionState = getRuntimeContext.getState(
+        new ValueStateDescriptor[List[CustomCaseClass]](
+          "collectionState", createTypeInformation[List[CustomCaseClass]]))
+      tryState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Try[CustomCaseClass]](
+          "tryState", createTypeInformation[Try[CustomCaseClass]]))
+      tryFailureState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Try[CustomCaseClass]](
+          "tryFailureState", createTypeInformation[Try[CustomCaseClass]]))
+      optionState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Option[CustomCaseClass]](
+          "optionState", createTypeInformation[Option[CustomCaseClass]]))
+      optionNoneState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Option[CustomCaseClass]](
+          "optionNoneState", createTypeInformation[Option[CustomCaseClass]]))
+      eitherLeftState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Either[CustomCaseClass, String]](
+          "eitherLeftState", createTypeInformation[Either[CustomCaseClass, String]]))
+      eitherRightState = getRuntimeContext.getState(
+        new ValueStateDescriptor[Either[CustomCaseClass, String]](
+          "eitherRightState", createTypeInformation[Either[CustomCaseClass, String]]))
+      enumOneState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomEnum](
+          "enumOneState", createTypeInformation[CustomEnum]))
+      enumThreeState = getRuntimeContext.getState(
+        new ValueStateDescriptor[CustomEnum](
+          "enumThreeState", createTypeInformation[CustomEnum]))
+    }
+
+    override def flatMap(in: (Long, Long), collector: Collector[(Long, Long)]): Unit = {
+      caseClassState.update(CustomCaseClass(in._1.toString, in._2 * 2))
+      caseClassWithNestingState.update(
+        CustomCaseClassWithNesting(in._1, CustomCaseClass(in._1.toString, in._2 * 2)))
+      collectionState.update(List(CustomCaseClass(in._1.toString, in._2 * 2)))
+      tryState.update(Try(CustomCaseClass(in._1.toString, in._2 * 5)))
+      tryFailureState.update(Failure(new RuntimeException))
+      optionState.update(Some(CustomCaseClass(in._1.toString, in._2 * 2)))
+      optionNoneState.update(None)
+      eitherLeftState.update(Left(CustomCaseClass(in._1.toString, in._2 * 2)))
+      eitherRightState.update(Right((in._1 * 3).toString))
+      enumOneState.update(CustomEnum.ONE)
+      enumOneState.update(CustomEnum.THREE)
+
+      collector.collect(in)
+    }
+  }
+
+}


[11/15] flink git commit: [FLINK-6801] [core] Allow deserialized PojoSerializer to have removed fields

Posted by tz...@apache.org.
[FLINK-6801] [core] Allow deserialized PojoSerializer to have removed fields

Prior to this commit, deserializing the PojoSerializer would fail when
we encounter a missing field that existed in the POJO type before. It is
actually perfectly fine to have a missing field; the deserialized
PojoSerializer should simply skip reading the removed field's previously
serialized values, i.e. much like how Java Object Serialization works.

This commit relaxes the deserialization of the PojoSerializer, so that a
null will be used as a placeholder value to indicate a removed field
that previously existed. De-/serialization and copying methods on the
PojoSerializer will respect null Fields and simply skip them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae285f9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae285f9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae285f9b

Branch: refs/heads/master
Commit: ae285f9bd5398fe4d8d86eb3207bbc5beb8a24c8
Parents: c929eb3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 20:41:59 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/FieldSerializer.java |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  | 109 +++++++++++--------
 2 files changed, 64 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae285f9b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
index 56a4445..5d23b91 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
@@ -51,7 +51,7 @@ public class FieldSerializer {
 				clazz = clazz.getSuperclass();
 			}
 		}
-		throw new IOException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-				+ " (" + fieldName + ")");
+
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae285f9b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7818897..6a67428 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -204,10 +204,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	protected void initializeFields(T t) {
 		for (int i = 0; i < numFields; i++) {
-			try {
-				fields[i].set(t, fieldSerializers[i].createInstance());
-			} catch (IllegalAccessException e) {
-				throw new RuntimeException("Cannot initialize fields.", e);
+			if (fields[i] != null) {
+				try {
+					fields[i].set(t, fieldSerializers[i].createInstance());
+				} catch (IllegalAccessException e) {
+					throw new RuntimeException("Cannot initialize fields.", e);
+				}
 			}
 		}
 	}
@@ -231,13 +233,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			// no subclass
 			try {
 				for (int i = 0; i < numFields; i++) {
-					Object value = fields[i].get(from);
-					if (value != null) {
-						Object copy = fieldSerializers[i].copy(value);
-						fields[i].set(target, copy);
-					}
-					else {
-						fields[i].set(target, null);
+					if (fields[i] != null) {
+						Object value = fields[i].get(from);
+						if (value != null) {
+							Object copy = fieldSerializers[i].copy(value);
+							fields[i].set(target, copy);
+						} else {
+							fields[i].set(target, null);
+						}
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -268,20 +271,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		if (actualType == clazz) {
 			try {
 				for (int i = 0; i < numFields; i++) {
-					Object value = fields[i].get(from);
-					if (value != null) {
-						Object reuseValue = fields[i].get(reuse);
-						Object copy;
-						if(reuseValue != null) {
-							copy = fieldSerializers[i].copy(value, reuseValue);
-						}
-						else {
-							copy = fieldSerializers[i].copy(value);
+					if (fields[i] != null) {
+						Object value = fields[i].get(from);
+						if (value != null) {
+							Object reuseValue = fields[i].get(reuse);
+							Object copy;
+							if (reuseValue != null) {
+								copy = fieldSerializers[i].copy(value, reuseValue);
+							} else {
+								copy = fieldSerializers[i].copy(value);
+							}
+							fields[i].set(reuse, copy);
+						} else {
+							fields[i].set(reuse, null);
 						}
-						fields[i].set(reuse, copy);
-					}
-					else {
-						fields[i].set(reuse, null);
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -342,7 +345,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		if ((flags & NO_SUBCLASS) != 0) {
 			try {
 				for (int i = 0; i < numFields; i++) {
-					Object o = fields[i].get(value);
+					Object o = (fields[i] != null) ? fields[i].get(value) : null;
 					if (o == null) {
 						target.writeBoolean(true); // null field handling
 					} else {
@@ -400,11 +403,17 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			try {
 				for (int i = 0; i < numFields; i++) {
 					boolean isNull = source.readBoolean();
-					if (isNull) {
-						fields[i].set(target, null);
-					} else {
-						Object field = fieldSerializers[i].deserialize(source);
-						fields[i].set(target, field);
+
+					if (fields[i] != null) {
+						if (isNull) {
+							fields[i].set(target, null);
+						} else {
+							Object field = fieldSerializers[i].deserialize(source);
+							fields[i].set(target, field);
+						}
+					} else if (!isNull) {
+						// read and dump a pre-existing field value
+						fieldSerializers[i].deserialize(source);
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -465,20 +474,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 			try {
 				for (int i = 0; i < numFields; i++) {
 					boolean isNull = source.readBoolean();
-					if (isNull) {
-						fields[i].set(reuse, null);
-					} else {
-						Object field;
 
-						Object reuseField = fields[i].get(reuse);
-						if(reuseField != null) {
-							field = fieldSerializers[i].deserialize(reuseField, source);
-						}
-						else {
-							field = fieldSerializers[i].deserialize(source);
-						}
+					if (fields[i] != null) {
+						if (isNull) {
+							fields[i].set(reuse, null);
+						} else {
+							Object field;
 
-						fields[i].set(reuse, field);
+							Object reuseField = fields[i].get(reuse);
+							if (reuseField != null) {
+								field = fieldSerializers[i].deserialize(reuseField, source);
+							} else {
+								field = fieldSerializers[i].deserialize(source);
+							}
+
+							fields[i].set(reuse, field);
+						}
+					} else if (!isNull) {
+						// read and dump a pre-existing field value
+						fieldSerializers[i].deserialize(source);
 					}
 				}
 			} catch (IllegalAccessException e) {
@@ -1012,8 +1026,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	// --------------------------------------------------------------------------------------------
 
-	private void writeObject(ObjectOutputStream out)
-		throws IOException, ClassNotFoundException {
+	private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
 		out.defaultWriteObject();
 		out.writeInt(fields.length);
 		for (Field field: fields) {
@@ -1021,12 +1034,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		}
 	}
 
-	private void readObject(ObjectInputStream in)
-		throws IOException, ClassNotFoundException {
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
 		int numFields = in.readInt();
 		fields = new Field[numFields];
 		for (int i = 0; i < numFields; i++) {
+			// the deserialized Field may be null if the field no longer exists in the POJO;
+			// in this case, when de-/serializing and copying instances using this serializer
+			// instance, the missing fields will simply be skipped
 			fields[i] = FieldSerializer.deserializeField(in);
 		}
 
@@ -1128,7 +1143,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	private int findField(String fieldName) {
 		int foundIndex = 0;
 		for (Field field : fields) {
-			if (fieldName.equals(field.getName())) {
+			if (field != null && fieldName.equals(field.getName())) {
 				return foundIndex;
 			}