You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 16:30:00 UTC
[03/12] flink git commit: [FLINK-6830] [tests] Add
StatefulJobSavepointFrom13MigrationITCase
[FLINK-6830] [tests] Add StatefulJobSavepointFrom13MigrationITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33c49e79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33c49e79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33c49e79
Branch: refs/heads/master
Commit: 33c49e79fb1d21f52a8a6482baf43253520d365e
Parents: 539787b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jun 2 17:18:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:28:58 2017 +0200
----------------------------------------------------------------------
.../utils/SavepointMigrationTestBase.java | 8 ++-
...atefulJobSavepointFrom12MigrationITCase.java | 19 +++---
...atefulJobSavepointFrom13MigrationITCase.java | 58 +++++++++++++++++++
.../_metadata | Bin 0 -> 36467 bytes
.../_metadata | Bin 0 -> 36395 bytes
5 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index c5672a2..e4004c7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -169,7 +169,13 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
- FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
+ File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
+ // savepoints were changed to be directories in Flink 1.3
+ if (jobManagerSavepoint.isDirectory()) {
+ FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
+ } else {
+ FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath));
+ }
}
@SafeVarargs
http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index 5f03195..4a1d181 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -76,7 +76,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
env.setStateBackend(new MemoryStateBackend());
env.enableCheckpointing(500);
env.setParallelism(4);
@@ -103,7 +102,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
executeAndSavepoint(
env,
- "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
+ "src/test/resources/" + getSavepointPath(),
new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
@@ -144,7 +143,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
executeAndSavepoint(
env,
- "src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint",
+ "src/test/resources/" + getRocksDBSavepointPath(),
new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
@@ -155,7 +154,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
env.setStateBackend(new MemoryStateBackend());
env.enableCheckpointing(500);
env.setParallelism(4);
@@ -182,7 +180,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
restoreAndExecute(
env,
- getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
+ getResourceFilename(getSavepointPath()),
new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
@@ -201,7 +199,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
env.enableCheckpointing(500);
env.setParallelism(4);
@@ -228,7 +225,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
restoreAndExecute(
env,
- getResourceFilename("stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"),
+ getResourceFilename(getRocksDBSavepointPath()),
new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
@@ -241,6 +238,14 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
}
+ protected String getSavepointPath() {
+ return "stateful-udf-migration-itcase-flink1.2-savepoint";
+ }
+
+ protected String getRocksDBSavepointPath() {
+ return "stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint";
+ }
+
private static class LegacyCheckpointedSource
implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
new file mode 100644
index 0000000..a2d3201
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.3 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulJobSavepointFrom13MigrationITCase extends StatefulJobSavepointFrom12MigrationITCase {
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.3.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink13() throws Exception {
+ testCreateSavepointOnFlink12();
+ }
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.3.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink13WithRocksDB() throws Exception {
+ testCreateSavepointOnFlink12WithRocksDB();
+ }
+
+ @Override
+ protected String getSavepointPath() {
+ return "stateful-udf-migration-itcase-flink1.3-savepoint";
+ }
+
+ @Override
+ protected String getRocksDBSavepointPath() {
+ return "stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..8f22bcb
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ
http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
new file mode 100644
index 0000000..8ca91ec
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata differ