You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 16:30:00 UTC

[03/12] flink git commit: [FLINK-6830] [tests] Add StatefulJobSavepointFrom13MigrationITCase

[FLINK-6830] [tests] Add StatefulJobSavepointFrom13MigrationITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33c49e79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33c49e79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33c49e79

Branch: refs/heads/master
Commit: 33c49e79fb1d21f52a8a6482baf43253520d365e
Parents: 539787b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Jun 2 17:18:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:28:58 2017 +0200

----------------------------------------------------------------------
 .../utils/SavepointMigrationTestBase.java       |   8 ++-
 ...atefulJobSavepointFrom12MigrationITCase.java |  19 +++---
 ...atefulJobSavepointFrom13MigrationITCase.java |  58 +++++++++++++++++++
 .../_metadata                                   | Bin 0 -> 36467 bytes
 .../_metadata                                   | Bin 0 -> 36395 bytes
 5 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index c5672a2..e4004c7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -169,7 +169,13 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
 		LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
 
-		FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
+		File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
+		// savepoints were changed to be directories in Flink 1.3
+		if (jobManagerSavepoint.isDirectory()) {
+			FileUtils.moveDirectory(jobManagerSavepoint, new File(savepointPath));
+		} else {
+			FileUtils.moveFile(jobManagerSavepoint, new File(savepointPath));
+		}
 	}
 
 	@SafeVarargs

http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
index 5f03195..4a1d181 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -76,7 +76,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
 		env.setStateBackend(new MemoryStateBackend());
 		env.enableCheckpointing(500);
 		env.setParallelism(4);
@@ -103,7 +102,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		executeAndSavepoint(
 				env,
-				"src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
+				"src/test/resources/" + getSavepointPath(),
 				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
@@ -144,7 +143,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		executeAndSavepoint(
 				env,
-				"src/test/resources/stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint",
+				"src/test/resources/" + getRocksDBSavepointPath(),
 				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
@@ -155,7 +154,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
 		env.setStateBackend(new MemoryStateBackend());
 		env.enableCheckpointing(500);
 		env.setParallelism(4);
@@ -182,7 +180,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		restoreAndExecute(
 				env,
-				getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
+				getResourceFilename(getSavepointPath()),
 				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
 				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
 				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
@@ -201,7 +199,6 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setRestartStrategy(RestartStrategies.noRestart());
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		// we only test memory state backend yet
 		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 		env.enableCheckpointing(500);
 		env.setParallelism(4);
@@ -228,7 +225,7 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 
 		restoreAndExecute(
 				env,
-				getResourceFilename("stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint"),
+				getResourceFilename(getRocksDBSavepointPath()),
 				new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
 				new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
 				new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
@@ -241,6 +238,14 @@ public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigratio
 				new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
 	}
 
+	protected String getSavepointPath() {
+		return "stateful-udf-migration-itcase-flink1.2-savepoint";
+	}
+
+	protected String getRocksDBSavepointPath() {
+		return "stateful-udf-migration-itcase-flink1.2-rocksdb-savepoint";
+	}
+
 	private static class LegacyCheckpointedSource
 			implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
new file mode 100644
index 0000000..a2d3201
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom13MigrationITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.3 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulJobSavepointFrom13MigrationITCase extends StatefulJobSavepointFrom12MigrationITCase {
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.3.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink13() throws Exception {
+		testCreateSavepointOnFlink12();
+	}
+
+	/**
+	 * This has to be manually executed to create the savepoint on Flink 1.3.
+	 */
+	@Test
+	@Ignore
+	public void testCreateSavepointOnFlink13WithRocksDB() throws Exception {
+		testCreateSavepointOnFlink12WithRocksDB();
+	}
+
+	@Override
+	protected String getSavepointPath() {
+		return "stateful-udf-migration-itcase-flink1.3-savepoint";
+	}
+
+	@Override
+	protected String getRocksDBSavepointPath() {
+		return "stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
new file mode 100644
index 0000000..8f22bcb
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/33c49e79/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata
new file mode 100644
index 0000000..8ca91ec
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.3-savepoint/_metadata differ