You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 17:22:53 UTC

[06/12] flink git commit: [FLINK-6830] [tests] Port topology change migration ITCases for Flink 1.3

[FLINK-6830] [tests] Port topology change migration ITCases for Flink 1.3


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f720706
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f720706
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f720706

Branch: refs/heads/release-1.3
Commit: 8f72070619aef229f348a3d85d91094c8a640fbf
Parents: 7a4d016
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Jun 3 21:56:54 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:52:23 2017 +0200

----------------------------------------------------------------------
 .../AbstractOperatorRestoreTestBase.java        |  14 ++--
 .../state/operator/restore/ExecutionMode.java   |   6 +-
 .../AbstractKeyedOperatorRestoreTestBase.java   |  70 +++++++++++++++++++
 .../restore/keyed/KeyedComplexChainTest.java    |  22 +-----
 .../state/operator/restore/keyed/KeyedJob.java  |   7 +-
 ...AbstractNonKeyedOperatorRestoreTestBase.java |  26 ++++++-
 .../restore/unkeyed/ChainBreakTest.java         |   4 ++
 .../unkeyed/ChainLengthDecreaseTest.java        |   4 ++
 .../unkeyed/ChainLengthIncreaseTest.java        |   4 ++
 .../restore/unkeyed/ChainOrderTest.java         |   4 ++
 .../restore/unkeyed/ChainUnionTest.java         |   4 ++
 .../operator/restore/unkeyed/NonKeyedJob.java   |   6 +-
 .../complexKeyed-flink1.2/_metadata             | Bin 0 -> 137490 bytes
 .../complexKeyed-flink1.3/_metadata             | Bin 0 -> 163526 bytes
 .../operatorstate/complexKeyed/_metadata        | Bin 137490 -> 0 bytes
 .../operatorstate/nonKeyed-flink1.2/_metadata   | Bin 0 -> 3212 bytes
 .../operatorstate/nonKeyed-flink1.3/_metadata   | Bin 0 -> 6248 bytes
 .../resources/operatorstate/nonKeyed/_metadata  | Bin 3212 -> 0 bytes
 18 files changed, 130 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 82e8d94..f087cf4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -65,11 +65,11 @@ import java.net.URL;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 1.3 and that the topology can be modified
- * from that point on.
+ * Abstract class to verify that it is possible to migrate a savepoint across upgraded Flink versions and that the
+ * topology can be modified from that point on.
  * 
  * The verification is done in 2 Steps:
- * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint, and create a new savepoint.
+ * Step 1: Migrate the job to the newer version by submitting the same job used for the old version savepoint, and create a new savepoint.
  * Step 2: Modify the job topology, and restore from the savepoint created in step 1.
  */
 public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
@@ -160,9 +160,9 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 
 	@Test
 	public void testMigrationAndRestore() throws Throwable {
-		// submit 1.2 job and create a migrated 1.3 savepoint
+		// submit job with old version savepoint and create a migrated savepoint in the new version
 		String savepointPath = migrateJob();
-		// restore from migrated 1.3 savepoint
+		// restore from migrated new version savepoint
 		restoreJob(savepointPath);
 	}
 
@@ -256,14 +256,14 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 	}
 
 	/**
-	 * Recreates the job used to create the 1.2 savepoint.
+	 * Recreates the job used to create the new version savepoint.
 	 *
 	 * @param env StreamExecutionEnvironment to use
 	 */
 	protected abstract void createMigrationJob(StreamExecutionEnvironment env);
 
 	/**
-	 * Creates a modified version of the job used to create the 1.2 savepoint.
+	 * Creates a modified version of the job used to create the new version savepoint.
 	 *
 	 * @param env StreamExecutionEnvironment to use
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
index f333aca..ae9fb21 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.state.operator.restore;
 /**
  * Enum to control function behavior for the different test stages.
  * 
- * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 savepoint.
- * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 savepoint to 1.3.
- * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated 1.3 savepoint.
+ * {@link ExecutionMode#GENERATE} should be used when creating the savepoint.
+ * {@link ExecutionMode#MIGRATE} should be used when migrating the savepoint to a newer version.
+ * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated newer version savepoint.
  */
 public enum ExecutionMode {
 	GENERATE,

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
new file mode 100644
index 0000000..1b66c21
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Base class for all keyed operator restore tests.
+ */
+@RunWith(Parameterized.class)
+public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
+
+	private final String savepointPath;
+
+	@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
+	public static Collection<String> parameters () {
+		return Arrays.asList(
+			"complexKeyed-flink1.2",
+			"complexKeyed-flink1.3");
+	}
+
+	public AbstractKeyedOperatorRestoreTestBase(String savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
+	@Override
+	public void createMigrationJob(StreamExecutionEnvironment env) {
+		/**
+		 * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+		 */
+		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
+
+		SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
+
+		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
+
+		SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+	}
+
+	@Override
+	protected String getMigrationSavepointName() {
+		return savepointPath;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
index 28cd15a..605722d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -20,23 +20,12 @@ package org.apache.flink.test.state.operator.restore.keyed;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
-public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
+public class KeyedComplexChainTest extends AbstractKeyedOperatorRestoreTestBase {
 
-	@Override
-	public void createMigrationJob(StreamExecutionEnvironment env) {
-		/**
-		 * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
-		 */
-		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
-
-		SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
-
-		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
-
-		SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+	public KeyedComplexChainTest(String savepointPath) {
+		super(savepointPath);
 	}
 
 	@Override
@@ -53,9 +42,4 @@ public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
 		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
 		first.startNewChain();
 	}
-
-	@Override
-	protected final String getMigrationSavepointName() {
-		return "complexKeyed";
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 6add7b2..523e937 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -44,7 +44,8 @@ import java.util.Iterator;
 import java.util.List;
 
 /**
- * Savepoint generator to create the job used by the {@link KeyedComplexChainTest}.
+ * Savepoint generator to create the savepoint used by the {@link AbstractKeyedOperatorRestoreTestBase}.
+ * Switch to specific version branches and run this job to create savepoints of different Flink versions.
  *
  * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
  */
@@ -236,8 +237,4 @@ public class KeyedJob {
 			}
 		}
 	}
-
-
-	private KeyedJob() {
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index 5b51765..22fa7b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -23,6 +23,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
 import org.apache.flink.test.state.operator.restore.ExecutionMode;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
@@ -30,10 +36,24 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
 import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
 
 /**
- * All classes extending this class will use the same savepoint and migration job.
+ * Base class for all non-keyed operator restore tests.
  */
+@RunWith(Parameterized.class)
 public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
 
+	private final String savepointPath;
+
+	@Parameterized.Parameters(name = "Migrate Savepoint: {0}")
+	public static Collection<String> parameters () {
+		return Arrays.asList(
+			"nonKeyed-flink1.2",
+			"nonKeyed-flink1.3");
+	}
+
+	public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
 	@Override
 	public void createMigrationJob(StreamExecutionEnvironment env) {
 		/**
@@ -53,7 +73,7 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
 	}
 
 	@Override
-	protected final String getMigrationSavepointName() {
-		return "nonKeyed";
+	protected String getMigrationSavepointName() {
+		return savepointPath;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
index 6838070..8055833 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainBreakTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
index e405e76..3235387 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -32,6 +32,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainLengthDecreaseTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
index b78aa10..a10f99c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainLengthIncreaseTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
index 7c68b4e..0baa233 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainOrderTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
index 3f2fba4..0d21e8a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
  */
 public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
 
+	public ChainUnionTest(String savepointPath) {
+		super(savepointPath);
+	}
+
 	@Override
 	public void createRestoredJob(StreamExecutionEnvironment env) {
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index 32067b3..08a4c67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -37,7 +37,8 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Savepoint generator to create the job used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ * Savepoint generator to create the savepoint used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ * Switch to specific version branches and run this job to create savepoints of different Flink versions.
  *
  * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
  */
@@ -192,7 +193,4 @@ public class NonKeyedJob {
 			}
 		}
 	}
-
-	private NonKeyedJob() {
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
new file mode 100644
index 0000000..9e03876
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
new file mode 100644
index 0000000..a4f5a1e
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
deleted file mode 100644
index 9e03876..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
new file mode 100644
index 0000000..8fcd1ea
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
new file mode 100644
index 0000000..46169e0
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
deleted file mode 100644
index 8fcd1ea..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata and /dev/null differ