You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 17:22:53 UTC
[06/12] flink git commit: [FLINK-6830] [tests] Port topology change
migration ITCases for Flink 1.3
[FLINK-6830] [tests] Port topology change migration ITCases for Flink 1.3
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f720706
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f720706
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f720706
Branch: refs/heads/release-1.3
Commit: 8f72070619aef229f348a3d85d91094c8a640fbf
Parents: 7a4d016
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Jun 3 21:56:54 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:52:23 2017 +0200
----------------------------------------------------------------------
.../AbstractOperatorRestoreTestBase.java | 14 ++--
.../state/operator/restore/ExecutionMode.java | 6 +-
.../AbstractKeyedOperatorRestoreTestBase.java | 70 +++++++++++++++++++
.../restore/keyed/KeyedComplexChainTest.java | 22 +-----
.../state/operator/restore/keyed/KeyedJob.java | 7 +-
...AbstractNonKeyedOperatorRestoreTestBase.java | 26 ++++++-
.../restore/unkeyed/ChainBreakTest.java | 4 ++
.../unkeyed/ChainLengthDecreaseTest.java | 4 ++
.../unkeyed/ChainLengthIncreaseTest.java | 4 ++
.../restore/unkeyed/ChainOrderTest.java | 4 ++
.../restore/unkeyed/ChainUnionTest.java | 4 ++
.../operator/restore/unkeyed/NonKeyedJob.java | 6 +-
.../complexKeyed-flink1.2/_metadata | Bin 0 -> 137490 bytes
.../complexKeyed-flink1.3/_metadata | Bin 0 -> 163526 bytes
.../operatorstate/complexKeyed/_metadata | Bin 137490 -> 0 bytes
.../operatorstate/nonKeyed-flink1.2/_metadata | Bin 0 -> 3212 bytes
.../operatorstate/nonKeyed-flink1.3/_metadata | Bin 0 -> 6248 bytes
.../resources/operatorstate/nonKeyed/_metadata | Bin 3212 -> 0 bytes
18 files changed, 130 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 82e8d94..f087cf4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -65,11 +65,11 @@ import java.net.URL;
import java.util.concurrent.TimeUnit;
/**
- * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 1.3 and that the topology can be modified
- * from that point on.
+ * Abstract class to verify that it is possible to migrate a savepoint across upgraded Flink versions and that the
+ * topology can be modified from that point on.
*
* The verification is done in 2 Steps:
- * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint, and create a new savepoint.
+ * Step 1: Migrate the job to the newer version by submitting the same job used for the old version savepoint, and create a new savepoint.
* Step 2: Modify the job topology, and restore from the savepoint created in step 1.
*/
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
@@ -160,9 +160,9 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
@Test
public void testMigrationAndRestore() throws Throwable {
- // submit 1.2 job and create a migrated 1.3 savepoint
+ // submit job with old version savepoint and create a migrated savepoint in the new version
String savepointPath = migrateJob();
- // restore from migrated 1.3 savepoint
+ // restore from migrated new version savepoint
restoreJob(savepointPath);
}
@@ -256,14 +256,14 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
}
/**
- * Recreates the job used to create the 1.2 savepoint.
+ * Recreates the job used to create the new version savepoint.
*
* @param env StreamExecutionEnvironment to use
*/
protected abstract void createMigrationJob(StreamExecutionEnvironment env);
/**
- * Creates a modified version of the job used to create the 1.2 savepoint.
+ * Creates a modified version of the job used to create the new version savepoint.
*
* @param env StreamExecutionEnvironment to use
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
index f333aca..ae9fb21 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.state.operator.restore;
/**
* Enum to control function behavior for the different test stages.
*
- * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 savepoint.
- * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 savepoint to 1.3.
- * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated 1.3 savepoint.
+ * {@link ExecutionMode#GENERATE} should be used when creating the savepoint.
+ * {@link ExecutionMode#MIGRATE} should be used when migrating the savepoint to a newer version.
+ * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated newer version savepoint.
*/
public enum ExecutionMode {
GENERATE,
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
new file mode 100644
index 0000000..1b66c21
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Base class for all keyed operator restore tests.
+ */
+@RunWith(Parameterized.class)
+public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
+
+ private final String savepointPath;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint: {0}")
+ public static Collection<String> parameters () {
+ return Arrays.asList(
+ "complexKeyed-flink1.2",
+ "complexKeyed-flink1.3");
+ }
+
+ public AbstractKeyedOperatorRestoreTestBase(String savepointPath) {
+ this.savepointPath = savepointPath;
+ }
+
+ @Override
+ public void createMigrationJob(StreamExecutionEnvironment env) {
+ /**
+ * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+ */
+ SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
+
+ SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
+
+ SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
+
+ SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+ }
+
+ @Override
+ protected String getMigrationSavepointName() {
+ return savepointPath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
index 28cd15a..605722d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -20,23 +20,12 @@ package org.apache.flink.test.state.operator.restore.keyed;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
-public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
+public class KeyedComplexChainTest extends AbstractKeyedOperatorRestoreTestBase {
- @Override
- public void createMigrationJob(StreamExecutionEnvironment env) {
- /**
- * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
- */
- SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
-
- SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
-
- SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
-
- SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+ public KeyedComplexChainTest(String savepointPath) {
+ super(savepointPath);
}
@Override
@@ -53,9 +42,4 @@ public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
first.startNewChain();
}
-
- @Override
- protected final String getMigrationSavepointName() {
- return "complexKeyed";
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index 6add7b2..523e937 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -44,7 +44,8 @@ import java.util.Iterator;
import java.util.List;
/**
- * Savepoint generator to create the job used by the {@link KeyedComplexChainTest}.
+ * Savepoint generator to create the savepoint used by the {@link AbstractKeyedOperatorRestoreTestBase}.
+ * Switch to specific version branches and run this job to create savepoints of different Flink versions.
*
* The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
*/
@@ -236,8 +237,4 @@ public class KeyedJob {
}
}
}
-
-
- private KeyedJob() {
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index 5b51765..22fa7b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -23,6 +23,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
@@ -30,10 +36,24 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
/**
- * All classes extending this class will use the same savepoint and migration job.
+ * Base class for all non-keyed operator restore tests.
*/
+@RunWith(Parameterized.class)
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
+ private final String savepointPath;
+
+ @Parameterized.Parameters(name = "Migrate Savepoint: {0}")
+ public static Collection<String> parameters () {
+ return Arrays.asList(
+ "nonKeyed-flink1.2",
+ "nonKeyed-flink1.3");
+ }
+
+ public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath) {
+ this.savepointPath = savepointPath;
+ }
+
@Override
public void createMigrationJob(StreamExecutionEnvironment env) {
/**
@@ -53,7 +73,7 @@ public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOp
}
@Override
- protected final String getMigrationSavepointName() {
- return "nonKeyed";
+ protected String getMigrationSavepointName() {
+ return savepointPath;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
index 6838070..8055833 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*/
public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
+ public ChainBreakTest(String savepointPath) {
+ super(savepointPath);
+ }
+
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
index e405e76..3235387 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -32,6 +32,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*/
public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+ public ChainLengthDecreaseTest(String savepointPath) {
+ super(savepointPath);
+ }
+
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
index b78aa10..a10f99c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*/
public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+ public ChainLengthIncreaseTest(String savepointPath) {
+ super(savepointPath);
+ }
+
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
index 7c68b4e..0baa233 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*/
public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
+ public ChainOrderTest(String savepointPath) {
+ super(savepointPath);
+ }
+
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
index 3f2fba4..0d21e8a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -33,6 +33,10 @@ import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*/
public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
+ public ChainUnionTest(String savepointPath) {
+ super(savepointPath);
+ }
+
@Override
public void createRestoredJob(StreamExecutionEnvironment env) {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index 32067b3..08a4c67 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -37,7 +37,8 @@ import java.util.Arrays;
import java.util.List;
/**
- * Savepoint generator to create the job used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ * Savepoint generator to create the savepoint used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ * Switch to specific version branches and run this job to create savepoints of different Flink versions.
*
* The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
*/
@@ -192,7 +193,4 @@ public class NonKeyedJob {
}
}
}
-
- private NonKeyedJob() {
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata
new file mode 100644
index 0000000..9e03876
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata differ
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata
new file mode 100644
index 0000000..a4f5a1e
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.3/_metadata differ
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
deleted file mode 100644
index 9e03876..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata
new file mode 100644
index 0000000..8fcd1ea
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.2/_metadata differ
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata
new file mode 100644
index 0000000..46169e0
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed-flink1.3/_metadata differ
http://git-wip-us.apache.org/repos/asf/flink/blob/8f720706/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
deleted file mode 100644
index 8fcd1ea..0000000
Binary files a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata and /dev/null differ