You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:29 UTC

[14/15] flink git commit: [FLINK-5366] Add Initial version of SavepointUtil

[FLINK-5366] Add Initial version of SavepointUtil

This will serve as the basis for end-to-end tests of savepoint restore
from Flink 1.1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/434013af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/434013af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/434013af

Branch: refs/heads/master
Commit: 434013afc91325697a659f736eb816a631659d36
Parents: b43067b
Author: twalthr <tw...@apache.org>
Authored: Wed Dec 14 09:17:54 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:54 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/utils/SavepointUtil.java | 341 +++++++++++++++++++
 .../utils/UserFunctionStateJob.java             | 113 ++++++
 .../src/test/resources/log4j-test.properties    |   2 +-
 flink-tests/src/test/resources/log4j.properties |  27 ++
 4 files changed, 482 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
new file mode 100644
index 0000000..85e21c5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointUtil.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+public class SavepointUtil {
+
+	// list of JobGraphs to create savepoints for
+	private static final ArrayList<Class<? extends SavepointTestJob>> savepointJobs = new ArrayList<>();
+	static {
+		savepointJobs.add(UserFunctionStateJob.class);
+	}
+
+	private static final Logger LOG = LoggerFactory.getLogger(SavepointUtil.class);
+	private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+	private static final String SAVEPOINT_BASE_DIR = "./flink-tests/src/test/resources/savepoints/";
+
+	private static final int STATE_WAIT_FOR_JOB = 0;
+	private static final int STATE_REQUEST_SAVEPOINT = 1;
+	private static final int STATE_SAVEPOINT_DONE = 2;
+	private static final int STATE_WAIT_FOR_TEST_JOB = 3;
+	private static final int STATE_TEST_RESULT = 4;
+	private static final int STATE_END = 5;
+
+	private static volatile int state = STATE_WAIT_FOR_JOB;
+
+	private static TestingCluster flink = null;
+	private static ActorGateway jobManager = null;
+	private static JobID jobId = null;
+	private static File savepointDir = null;
+	private static Exception testResult = null;
+
+	public static void main(String[] args) throws Exception {
+
+		// clean up
+//		FileUtils.deleteDirectory(new File(SAVEPOINT_BASE_DIR));
+
+		for (Class<? extends SavepointTestJob> testJob : savepointJobs) {
+			SavepointTestJob job = testJob.newInstance();
+
+//			runJobAndCreateSavepoint(job);
+
+			runJobAndCompareState(job);
+
+			triggerEndOfTest();
+		}
+	}
+
+	public static synchronized void triggerSavepoint() {
+		SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
+	}
+
+	public static synchronized boolean allowStateChange() {
+		return SavepointUtil.state < SavepointUtil.STATE_REQUEST_SAVEPOINT;
+	}
+
+	public static synchronized void triggerOrTestSavepoint(RichFunction function, Object expected, Object actual) throws Exception {
+		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
+			if (expected.equals(actual)) {
+				LOG.info("Test was successful.");
+				SavepointUtil.testResult = null;
+				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
+			} else {
+				LOG.info("Test failed.");
+				SavepointUtil.testResult = new Exception("Comparison of state failed. Expected: " + expected + " but was: " + actual);
+				SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
+			}
+		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
+			final StateCondition condition = new StateCondition(function.getClass(), function.getRuntimeContext().getIndexOfThisSubtask());
+			if (!testCounters.containsKey(condition)) {
+				testCounters.put(condition, 0);
+			}
+			final Integer counter = testCounters.get(condition);
+			testCounters.put(condition, counter + 1);
+			// check if all counters are ready
+			if (checkIfReadyForSavepoint()) {
+				SavepointUtil.state = SavepointUtil.STATE_REQUEST_SAVEPOINT;
+			}
+		}
+	}
+
+	public static void triggerEndOfTest() throws Exception {
+		LOG.info("Cancelling Flink.");
+		if (flink != null) {
+			flink.stop();
+		}
+		SavepointUtil.state = SavepointUtil.STATE_END;
+	}
+
+	public static void runJobAndCreateSavepoint(SavepointTestJob job) throws Exception {
+		LOG.info("Waiting for job.");
+		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_JOB;
+
+		final Thread t = new Thread(new SavepointPerformer());
+		t.start();
+
+		runJob(job);
+
+		while(SavepointUtil.state != SavepointUtil.STATE_SAVEPOINT_DONE && DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+		}
+	}
+
+	public static void runJobAndCompareState(SavepointTestJob job) throws Exception {
+		LOG.info("Waiting for test job.");
+		SavepointUtil.state = SavepointUtil.STATE_WAIT_FOR_TEST_JOB;
+
+		runJob(job);
+
+		while(SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT && DEADLINE.hasTimeLeft()) {
+			Thread.sleep(100);
+		}
+
+		if (SavepointUtil.state != SavepointUtil.STATE_TEST_RESULT) {
+			throw new Exception("No test result available.");
+		}
+		if (testResult != null) {
+			throw testResult;
+		}
+	}
+
+	public static void setTestResult(Exception e) {
+		SavepointUtil.testResult = e;
+		SavepointUtil.state = SavepointUtil.STATE_TEST_RESULT;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static void runJob(SavepointTestJob job) throws Exception {
+		// Config
+		int numTaskManagers = 2;
+		int numSlotsPerTaskManager = 2;
+		int parallelism = numTaskManagers * numSlotsPerTaskManager;
+		String savepointPath = SAVEPOINT_BASE_DIR + job.getClass().getSimpleName();
+
+		// Flink configuration
+		final Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+
+		final File checkpointDir = File.createTempFile("checkpoints", Long.toString(System.nanoTime()));
+		savepointDir = new File(savepointPath);
+		savepointDir.mkdirs();
+
+		if (!checkpointDir.exists() || !savepointDir.exists()) {
+			throw new Exception("Test setup failed: failed to create (temporary) directories.");
+		}
+
+		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
+		LOG.info("Created savepoint directory: " + savepointDir + ".");
+
+		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
+		config.setString("state.savepoints.dir", savepointDir.toURI().toString());
+
+		LOG.info("Flink configuration: " + config + ".");
+
+		// Start Flink
+		flink = new TestingCluster(config);
+		flink.start();
+
+		// Retrieve the job manager
+		jobManager = Await.result(flink.leaderGateway().future(), DEADLINE.timeLeft());
+
+		// Submit the job
+		final JobGraph jobGraph = job.createJobGraph();
+		if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_JOB) {
+			savepointCondition = job.getSavepointCondition();
+			testCounters.clear();
+		} else if (SavepointUtil.state == SavepointUtil.STATE_WAIT_FOR_TEST_JOB) {
+			final File[] dir = savepointDir.listFiles();
+			if (dir.length == 0) {
+				throw new RuntimeException("Savepoint of " + job.getClass().getSimpleName() + " does not exist.");
+			}
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(dir[0].getAbsolutePath()));
+		}
+		jobId = jobGraph.getJobID();
+
+		LOG.info("Submitting job " + jobGraph.getJobID() + " and waiting...");
+
+		flink.submitJobDetached(jobGraph);
+	}
+
+	private static final HashMap<StateCondition, Integer> testCounters = new HashMap<>();
+	private static SavepointCondition[] savepointCondition = null;
+
+	private static boolean checkIfReadyForSavepoint() {
+		for (SavepointCondition condition : savepointCondition) {
+			final StateCondition stateCondition = new StateCondition(condition.clazz, condition.subtask);
+			if (!testCounters.containsKey(stateCondition) || testCounters.get(stateCondition) != condition.invocation) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static void performSavepointAndShutdown() throws Exception {
+		LOG.info("Triggering a savepoint.");
+
+		// Flink 1.2
+		final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId, Option.<String>empty()), DEADLINE.timeLeft());
+		// Flink 1.1
+//        final Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobId), DEADLINE.timeLeft());
+
+		final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, DEADLINE.timeLeft())).savepointPath();
+		LOG.info("Saved savepoint: " + savepointPath);
+
+		// Retrieve the savepoint from the testing job manager
+		LOG.info("Requesting the savepoint.");
+		Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(savepointPath), DEADLINE.timeLeft());
+
+		Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
+		LOG.info("Retrieved savepoint: " + savepointPath + ".");
+
+		LOG.info("Storing savepoint to file.");
+
+		// Flink 1.2
+		// it might be that the savepoint has already been written to file in Flink 1.2
+		// this is just the command how to do it in 1.2
+//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepoint(savepointDir.getAbsolutePath(), savepoint);
+		// Flink 1.1
+		// this writes it for FLink 1.1
+//        Configuration config = new Configuration();
+//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
+//        config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file://" + savepointDir.getAbsolutePath());
+//        org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
+
+		LOG.info("Cancelling Flink.");
+		flink.stop();
+
+		SavepointUtil.state = SavepointUtil.STATE_SAVEPOINT_DONE;
+	}
+
+	private static class StateCondition {
+		private Class<?> clazz;
+		private Integer subtask;
+
+		StateCondition(Class<?> clazz, Integer subtask) {
+			this.clazz = clazz;
+			this.subtask = subtask;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			StateCondition that = (StateCondition) o;
+
+			return clazz.equals(that.clazz) && subtask.equals(that.subtask);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = clazz.hashCode();
+			result = 31 * result + subtask.hashCode();
+			return result;
+		}
+	}
+
+	public static class SavepointCondition {
+		Class<? extends RichFunction> clazz;
+		int subtask;
+		int invocation;
+
+		SavepointCondition(Class<? extends RichFunction> clazz, int subtask, int invocation) {
+			this.clazz = clazz;
+			this.subtask = subtask;
+			this.invocation = invocation;
+		}
+	}
+
+	public interface SavepointTestJob {
+		JobGraph createJobGraph();
+
+		SavepointCondition[] getSavepointCondition();
+	}
+
+	private static class SavepointPerformer implements Runnable {
+
+		@Override
+		public void run() {
+			try {
+				while (SavepointUtil.state != SavepointUtil.STATE_END) {
+					Thread.sleep(100);
+					if (SavepointUtil.state == SavepointUtil.STATE_REQUEST_SAVEPOINT) {
+						try {
+							performSavepointAndShutdown();
+						} catch (Exception e) {
+							throw new RuntimeException("Performing savepoint failed.", e);
+						}
+					}
+				}
+			} catch (InterruptedException e) {
+				// stop execution
+			}
+			LOG.info("SavepointPerformer Thread finished.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
new file mode 100644
index 0000000..1df7938
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/UserFunctionStateJob.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointCondition;
+import org.apache.flink.test.checkpointing.utils.SavepointUtil.SavepointTestJob;
+import org.apache.flink.util.Collector;
+
+public class UserFunctionStateJob implements SavepointTestJob {
+
+	@Override
+	public JobGraph createJobGraph() {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		// we only test memory state backend yet
+		env.setStateBackend(new MemoryStateBackend());
+		env.enableCheckpointing(500);
+		env.setParallelism(1);
+		env.setMaxParallelism(1);
+
+		// create source
+		final DataStream<Tuple2<Long, Long>> source = env
+			.addSource(new SourceFunction<Tuple2<Long, Long>>() {
+
+				private volatile boolean isRunning = true;
+
+				@Override
+				public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+					while (isRunning) {
+						synchronized (ctx.getCheckpointLock()) {
+							ctx.collect(new Tuple2<>(1L, 1L));
+						}
+					}
+				}
+
+				@Override
+				public void cancel() {
+					isRunning = false;
+				}
+			}).uid("CustomSourceFunction");
+
+		// non-keyed operator state
+		source.flatMap(new SumFlatMapperNonKeyedCheckpointed()).uid("SumFlatMapperNonKeyedCheckpointed").startNewChain().print();
+
+		return env.getStreamGraph().getJobGraph();
+	}
+
+	@Override
+	public SavepointCondition[] getSavepointCondition() {
+		return new SavepointCondition[] {
+			new SavepointCondition(SumFlatMapperNonKeyedCheckpointed.class, 0, 4)
+		};
+	}
+
+	public static class SumFlatMapperNonKeyedCheckpointed extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+			implements Checkpointed<Tuple2<Long, Long>> {
+
+		private transient Tuple2<Long, Long> sum;
+
+		@Override
+		public void restoreState(Tuple2<Long, Long> state) throws Exception {
+			sum = state;
+		}
+
+		@Override
+		public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			if (SavepointUtil.allowStateChange()) {
+				if (sum == null) {
+					sum = value;
+					out.collect(sum);
+				} else {
+					sum.f1 += value.f1;
+					out.collect(sum);
+				}
+			}
+
+			SavepointUtil.triggerOrTestSavepoint(
+				this,
+				new Tuple2<>(value.f1, value.f1 * 4),
+				sum);
+		}
+
+		@Override
+		public Tuple2<Long, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return sum;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index f55d9e2..7bfcee5 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/434013af/flink-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j.properties b/flink-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9fbcecd
--- /dev/null
+++ b/flink-tests/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=INFO, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n