You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/04/26 21:42:48 UTC

[3/3] flink git commit: [FLINK-6390] [checkpoints] Add API for checkpoints that are triggered via external systems

[FLINK-6390] [checkpoints] Add API for checkpoints that are triggered via external systems

This includes
  - A interface for hooks that are called by the checkpoint coordinator to trigger/restore a checkpoint
  - A source extension that triggers the operator checkpoints and barrier injection on certain events

Because this changes the checkpoint metadata format, the commit introduces a new metadata format version.

This closes #3782


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90ca4381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90ca4381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90ca4381

Branch: refs/heads/master
Commit: 90ca438106e63c5032ee2ad27e54e9f573eac386
Parents: 6bdaf1e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 27 17:20:47 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 26 21:56:45 2017 +0200

----------------------------------------------------------------------
 .../core/io/SimpleVersionedSerializer.java      |  80 ++++
 .../java/org/apache/flink/util/StringUtils.java |   7 +
 .../checkpoint/savepoint/SavepointV0.java       |  10 +
 .../savepoint/SavepointV0Serializer.java        |  12 +-
 .../checkpoint/CheckpointCoordinator.java       |  80 +++-
 .../checkpoint/CheckpointDeclineReason.java     |   4 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |  31 +-
 .../flink/runtime/checkpoint/MasterState.java   |  62 +++
 .../checkpoint/MasterTriggerRestoreHook.java    | 140 ++++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  23 +-
 .../runtime/checkpoint/hooks/MasterHooks.java   | 273 +++++++++++
 .../runtime/checkpoint/savepoint/Savepoint.java |   6 +
 .../checkpoint/savepoint/SavepointLoader.java   |   3 +-
 .../savepoint/SavepointSerializers.java         |   7 +-
 .../checkpoint/savepoint/SavepointV1.java       |  36 +-
 .../savepoint/SavepointV1Serializer.java        |  76 +--
 .../checkpoint/savepoint/SavepointV2.java       |  91 ++++
 .../savepoint/SavepointV2Serializer.java        | 468 +++++++++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |   9 +
 .../executiongraph/ExecutionGraphBuilder.java   |  18 +
 .../tasks/JobCheckpointingSettings.java         |  48 +-
 .../CheckpointCoordinatorMasterHooksTest.java   | 421 +++++++++++++++++
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/CompletedCheckpointTest.java     |  24 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   1 +
 .../savepoint/CheckpointTestUtils.java          | 184 ++++++++
 .../savepoint/SavepointLoaderTest.java          |   2 +-
 .../savepoint/SavepointStoreTest.java           |  27 +-
 .../savepoint/SavepointV1SerializerTest.java    |  17 +-
 .../checkpoint/savepoint/SavepointV1Test.java   | 157 -------
 .../savepoint/SavepointV2SerializerTest.java    | 148 ++++++
 .../checkpoint/savepoint/SavepointV2Test.java   |  68 +++
 .../ArchivedExecutionGraphTest.java             |   2 +
 .../api/checkpoint/ExternallyInducedSource.java |  75 +++
 .../checkpoint/WithMasterCheckpointHook.java    |  38 ++
 .../FunctionMasterCheckpointHookFactory.java    |  45 ++
 .../api/graph/StreamingJobGraphGenerator.java   |  28 +-
 .../runtime/tasks/SourceStreamTask.java         |  56 +++
 .../WithMasterCheckpointHookConfigTest.java     | 189 ++++++++
 .../runtime/io/StreamRecordWriterTest.java      |   5 -
 .../SourceExternalCheckpointTriggerTest.java    | 171 +++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |   7 +-
 .../test/checkpointing/SavepointITCase.java     |   4 +-
 43 files changed, 2874 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
new file mode 100644
index 0000000..6c061a5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * A simple serializer interface for versioned serialization.
+ * 
+ * <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached
+ * to the serialized data. When the serializer evolves, the version can be used to identify
+ * with which prior version the data was serialized.
+ * 
+ * <pre>{@code
+ * MyType someObject = ...;
+ * SimpleVersionedSerializer<MyType> serializer = ...;
+ *
+ * byte[] serializedData = serializer.serialize(someObject);
+ * int version = serializer.getVersion();
+ *
+ * MyType deserialized = serializer.deserialize(version, serializedData);
+ * 
+ * byte[] someOldData = ...;
+ * int oldVersion = ...;
+ * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData);
+ * 
+ * }</pre>
+ * 
+ * @param <E> The data type serialized / deserialized by this serializer.
+ */
+public interface SimpleVersionedSerializer<E> extends Versioned {
+
+	/**
+	 * Gets the version with which this serializer serializes.
+	 * 
+	 * @return The version of the serialization schema.
+	 */
+	@Override
+	int getVersion();
+
+	/**
+	 * Serializes the given object. The serialization is assumed to correspond to the
+	 * current serialization version (as returned by {@link #getVersion()}.
+	 *
+	 * 
+	 * @param obj The object to serialize.
+	 * @return The serialized data (bytes).
+	 * 
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	byte[] serialize(E obj) throws IOException;
+
+	/**
+	 * De-serializes the given data (bytes) which was serialized with the scheme of the
+	 * indicated version.
+	 * 
+	 * @param version The version in which the data was serialized
+	 * @param serialized The serialized data
+	 * @return The deserialized object
+	 * 
+	 * @throws IOException Thrown, if the deserialization fails.
+	 */
+	E deserialize(int version, byte[] serialized) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index b84f602..abd6ba6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -309,6 +309,13 @@ public final class StringUtils {
 		}
 	}
 
+	/**
+	 * Checks if the string is null, empty, or contains only whitespace characters.
+	 * A whitespace character is defined via {@link Character#isWhitespace(char)}.
+	 * 
+	 * @param str The string to check
+	 * @return True, if the string is null or blank, false otherwise.
+	 */
 	public static boolean isNullOrWhitespaceOnly(String str) {
 		if (str == null || str.length() == 0) {
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
index 1c51a69..f3ec1cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
@@ -19,6 +19,7 @@
 package org.apache.flink.migration.runtime.checkpoint.savepoint;
 
 import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.util.Preconditions;
 
@@ -58,6 +59,15 @@ public class SavepointV0 implements Savepoint {
 
 	@Override
 	public Collection<org.apache.flink.runtime.checkpoint.TaskState> getTaskStates() {
+		// since checkpoints are never deserialized into this format,
+		// this method should never be called
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Collection<MasterState> getMasterStates() {
+		// since checkpoints are never deserialized into this format,
+		// this method should never be called
 		throw new UnsupportedOperationException();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 4739033..d285906 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -34,7 +34,7 @@ import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
 import org.apache.flink.migration.util.SerializedValue;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -68,7 +68,7 @@ import java.util.Map;
  * don't rely on any involved Java classes to stay the same.
  */
 @SuppressWarnings("deprecation")
-public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV2> {
 
 	public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
 	private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
@@ -81,12 +81,12 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 
 
 	@Override
-	public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+	public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException {
 		throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
 	}
 
 	@Override
-	public SavepointV1 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException {
+	public SavepointV2 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException {
 
 		long checkpointId = dis.readLong();
 
@@ -165,7 +165,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 		return serializedValue;
 	}
 
-	private SavepointV1 convertSavepoint(
+	private SavepointV2 convertSavepoint(
 			List<TaskState> taskStates,
 			ClassLoader userClassLoader,
 			long checkpointID) throws Exception {
@@ -176,7 +176,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 			newTaskStates.add(convertTaskState(taskState, userClassLoader, checkpointID));
 		}
 
-		return new SavepointV1(checkpointID, newTaskStates);
+		return new SavepointV2(checkpointID, newTaskStates);
 	}
 
 	private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 256321e..23a38d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
@@ -39,7 +41,10 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +143,9 @@ public class CheckpointCoordinator {
 	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
 	private final ScheduledThreadPoolExecutor timer;
 
+	/** The master checkpoint hooks executed by this checkpoint coordinator */
+	private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
+
 	/** Actor that receives status updates from the execution graph this coordinator works for */
 	private JobStatusListener jobStatusListener;
 
@@ -220,6 +228,7 @@ public class CheckpointCoordinator {
 		this.executor = checkNotNull(executor);
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
+		this.masterHooks = new HashMap<>();
 
 		this.timer = new ScheduledThreadPoolExecutor(1,
 				new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
@@ -245,6 +254,45 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Configuration
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Adds the given master hook to the checkpoint coordinator. This method does nothing, if
+	 * the checkpoint coordinator already contained a hook with the same ID (as defined via
+	 * {@link MasterTriggerRestoreHook#getIdentifier()}).
+	 * 
+	 * @param hook The hook to add.
+	 * @return True, if the hook was added, false if the checkpoint coordinator already
+	 *         contained a hook with the same ID.
+	 */
+	public boolean addMasterHook(MasterTriggerRestoreHook<?> hook) {
+		checkNotNull(hook);
+
+		final String id = hook.getIdentifier();
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(id), "The hook has a null or empty id");
+
+		synchronized (lock) {
+			if (!masterHooks.containsKey(id)) {
+				masterHooks.put(id, hook);
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+	}
+
+	/**
+	 * Gets the number of currently register master hooks.
+	 */
+	public int getNumberOfRegisteredMasterHooks() {
+		synchronized (lock) {
+			return masterHooks.size();
+		}
+	}
+
 	/**
 	 * Sets the checkpoint stats tracker.
 	 *
@@ -492,6 +540,20 @@ public class CheckpointCoordinator {
 				checkpoint.setStatsCallback(callback);
 			}
 
+			// trigger the master hooks for the checkpoint
+			try {
+				List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
+						checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
+
+				for (MasterState s : masterStates) {
+					checkpoint.addMasterState(s);
+				}
+			}
+			catch (FlinkException e) {
+				checkpoint.abortError(e);
+				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+			}
+
 			// schedule the timer that will clean up the expired checkpoints
 			final Runnable canceller = new Runnable() {
 				@Override
@@ -962,13 +1024,25 @@ public class CheckpointCoordinator {
 
 			LOG.info("Restoring from latest valid checkpoint: {}.", latest);
 
+			// re-assign the task states
+
 			final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates();
 
 			StateAssignmentOperation stateAssignmentOperation =
 					new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState);
-
 			stateAssignmentOperation.assignStates();
 
+			// call master hooks for restore
+
+			MasterHooks.restoreMasterHooks(
+					masterHooks,
+					latest.getMasterHookStates(),
+					latest.getCheckpointID(),
+					allowNonRestoredState,
+					LOG);
+
+			// update metrics
+
 			if (statsTracker != null) {
 				long restoreTimestamp = System.currentTimeMillis();
 				RestoredCheckpointStats restored = new RestoredCheckpointStats(
@@ -1022,9 +1096,9 @@ public class CheckpointCoordinator {
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 	//  Accessors
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
 
 	public int getNumberOfPendingCheckpoints() {
 		return this.pendingCheckpoints.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 60fe657..41c50cc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -36,7 +36,9 @@ public enum CheckpointDeclineReason {
 
 	NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently running."),
 
-	EXCEPTION("An Exception occurred while triggering the checkpoint.");
+	EXCEPTION("An Exception occurred while triggering the checkpoint."),
+
+	EXPIRED("The checkpoint expired before triggering was complete");
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 79fc31f..bb49b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -27,11 +27,16 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -90,11 +95,14 @@ public class CompletedCheckpoint implements Serializable {
 	private final long duration;
 
 	/** States of the different task groups belonging to this checkpoint */
-	private final Map<JobVertexID, TaskState> taskStates;
+	private final HashMap<JobVertexID, TaskState> taskStates;
 
 	/** Properties for this checkpoint. */
 	private final CheckpointProperties props;
 
+	/** States that were created by a hook on the master (in the checkpoint coordinator) */
+	private final Collection<MasterState> masterHookStates;
+
 	/** The state handle to the externalized meta data, if the metadata has been externalized */
 	@Nullable
 	private final StreamStateHandle externalizedMetadata;
@@ -118,6 +126,7 @@ public class CompletedCheckpoint implements Serializable {
 			Map<JobVertexID, TaskState> taskStates) {
 
 		this(job, checkpointID, timestamp, completionTimestamp, taskStates,
+				Collections.<MasterState>emptyList(),
 				CheckpointProperties.forStandardCheckpoint());
 	}
 
@@ -127,9 +136,11 @@ public class CompletedCheckpoint implements Serializable {
 			long timestamp,
 			long completionTimestamp,
 			Map<JobVertexID, TaskState> taskStates,
+			@Nullable Collection<MasterState> masterHookStates,
 			CheckpointProperties props) {
 
-		this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null);
+		this(job, checkpointID, timestamp, completionTimestamp, taskStates, 
+				masterHookStates, props, null, null);
 	}
 
 	public CompletedCheckpoint(
@@ -138,6 +149,7 @@ public class CompletedCheckpoint implements Serializable {
 			long timestamp,
 			long completionTimestamp,
 			Map<JobVertexID, TaskState> taskStates,
+			@Nullable Collection<MasterState> masterHookStates,
 			CheckpointProperties props,
 			@Nullable StreamStateHandle externalizedMetadata,
 			@Nullable String externalPointer) {
@@ -156,7 +168,14 @@ public class CompletedCheckpoint implements Serializable {
 		this.checkpointID = checkpointID;
 		this.timestamp = timestamp;
 		this.duration = completionTimestamp - timestamp;
-		this.taskStates = checkNotNull(taskStates);
+
+		// we create copies here, to make sure we have no shared mutable
+		// data structure with the "outside world"
+		this.taskStates = new HashMap<>(checkNotNull(taskStates));
+		this.masterHookStates = masterHookStates == null || masterHookStates.isEmpty() ?
+				Collections.<MasterState>emptyList() :
+				new ArrayList<>(masterHookStates);
+
 		this.props = checkNotNull(props);
 		this.externalizedMetadata = externalizedMetadata;
 		this.externalPointer = externalPointer;
@@ -228,13 +247,17 @@ public class CompletedCheckpoint implements Serializable {
 	}
 
 	public Map<JobVertexID, TaskState> getTaskStates() {
-		return taskStates;
+		return Collections.unmodifiableMap(taskStates);
 	}
 
 	public TaskState getTaskState(JobVertexID jobVertexID) {
 		return taskStates.get(jobVertexID);
 	}
 
+	public Collection<MasterState> getMasterHookStates() {
+		return Collections.unmodifiableCollection(masterHookStates);
+	}
+
 	public boolean isExternalized() {
 		return externalizedMetadata != null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
new file mode 100644
index 0000000..2d09fdb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple encapsulation of state generated by checkpoint coordinator.
+ */
+public class MasterState implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final String name;
+	private final byte[] bytes;
+	private final int version;
+
+	public MasterState(String name, byte[] bytes, int version) {
+		this.name = checkNotNull(name);
+		this.bytes = checkNotNull(bytes);
+		this.version = version;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public String name() {
+		return name;
+	}
+
+	public byte[] bytes() {
+		return bytes;
+	}
+
+	public int version() {
+		return version;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "name: " + name + " ; version: " + version + " ; bytes: " + Arrays.toString(bytes);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
new file mode 100644
index 0000000..e77ed57
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.concurrent.Future;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
+/**
+ * The interface for hooks that can be called by the checkpoint coordinator when triggering or
+ * restoring a checkpoint. Such a hook is useful for example when preparing external systems for
+ * taking or restoring checkpoints.
+ * 
+ * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called when triggering a checkpoint)
+ * can return a result (via a future) that will be stored as part of the checkpoint metadata.
+ * When restoring a checkpoint, that stored result will be given to the {@link #restoreCheckpoint(long, Object)}
+ * method. The hook's {@link #getIdentifier() identifier} is used to map data to hook in the presence
+ * of multiple hooks, and when resuming a savepoint that was potentially created by a different job.
+ * The identifier has a similar role as for example the operator UID in the streaming API.
+ * 
+ * <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow graph. It is attached
+ * to the job graph, which gets sent to the cluster for execution. To avoid having to make the hook
+ * itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}.
+ * 
+ * @param <T> The type of the data produced by the hook and stored as part of the checkpoint metadata.
+ *            If the hook never stores any data, this can be typed to {@code Void}.
+ */
+public interface MasterTriggerRestoreHook<T> {
+
+	/**
+	 * Gets the identifier of this hook. The identifier is used to identify a specific hook in the
+	 * presence of multiple hooks and to give it the correct checkpointed data upon checkpoint restoration.
+	 * 
+	 * <p>The identifier should be unique between different hooks of a job, but deterministic/constant
+	 * so that upon resuming a savepoint, the hook will get the correct data.
+	 * For example, if the hook calls into another storage system and persists namespace/schema specific
+	 * information, then the name of the storage system, together with the namespace/schema name could
+	 * be an appropriate identifier.
+	 * 
+	 * <p>When multiple hooks of the same name are created and attached to a job graph, only the first
+	 * one is actually used. This can be exploited to deduplicate hooks that would do the same thing.
+	 * 
+	 * @return The identifier of the hook. 
+	 */
+	String getIdentifier();
+
+	/**
+	 * This method is called by the checkpoint coordinator prior when triggering a checkpoint, prior
+	 * to sending the "trigger checkpoint" messages to the source tasks.
+	 * 
+	 * <p>If the hook implementation wants to store data as part of the checkpoint, it may return
+	 * that data via a future, otherwise it should return null. The data is stored as part of
+	 * the checkpoint metadata under the hooks identifier (see {@link #getIdentifier()}).
+	 * 
+	 * <p>If the action by this hook needs to be executed synchronously, then this method should
+	 * directly execute the action synchronously and block until it is complete. The returned future
+	 * (if any) would typically be a completed future.
+	 * 
+	 * <p>If the action should be executed asynchronously and only needs to complete before the
+	 * checkpoint is considered completed, then the method may use the given executor to execute the
+	 * actual action and would signal its completion by completing the future. For hooks that do not
+	 * need to store data, the future would be completed with null.
+	 * 
+	 * @param checkpointId The ID (logical timestamp, monotonously increasing) of the checkpoint
+	 * @param timestamp The wall clock timestamp when the checkpoint was triggered, for
+	 *                  info/logging purposes. 
+	 * @param executor The executor for asynchronous actions
+	 * 
+	 * @return Optionally, a future that signals when the hook has completed and that contains
+	 *         data to be stored with the checkpoint.
+	 * 
+	 * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort.
+	 */
+	@Nullable
+	Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
+
+	/**
+	 * This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint.
+	 * If the checkpoint did store data from this hook, that data will be passed to this method. 
+	 * 
+	 * @param checkpointId The The ID (logical timestamp) of the restored checkpoint
+	 * @param checkpointData The data originally stored in the checkpoint by this hook, possibly null. 
+	 * 
+	 * @throws Exception Exceptions thrown while restoring the checkpoint will cause the restore
+	 *                   operation to fail and to possibly fall back to another checkpoint. 
+	 */
+	void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception;
+
+	/**
+	 * Creates a the serializer to (de)serializes the data stored by this hook. The serializer
+	 * serializes the result of the Future returned by the {@link #triggerCheckpoint(long, long, Executor)}
+	 * method, and deserializes the data stored in the checkpoint into the object passed to the
+	 * {@link #restoreCheckpoint(long, Object)} method. 
+	 * 
+	 * <p>If the hook never returns any data to be stored, then this method may return null as the
+	 * serializer.
+	 * 
+	 * @return The serializer to (de)serializes the data stored by this hook
+	 */
+	@Nullable
+	SimpleVersionedSerializer<T> createCheckpointDataSerializer();
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A factory to instantiate a {@code MasterTriggerRestoreHook}.
+	 * 
+	 * The hooks are defined when creating the streaming dataflow graph and are attached
+	 * to the job graph, which gets sent to the cluster for execution. To avoid having to make
+	 * the hook implementation serializable, a serializable hook factory is actually attached to the
+	 * job graph instead of the hook implementation itself.
+	 */
+	interface Factory extends java.io.Serializable {
+
+		/**
+		 * Instantiates the {@code MasterTriggerRestoreHook}.
+		 */
+		<V> MasterTriggerRestoreHook<V> create();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 900331b..ce97edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -41,8 +41,10 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -89,6 +91,8 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+	private final List<MasterState> masterState;
+
 	/** Set of acknowledged tasks */
 	private final Set<ExecutionAttemptID> acknowledgedTasks;
 
@@ -143,6 +147,7 @@ public class PendingCheckpoint {
 		this.executor = Preconditions.checkNotNull(executor);
 
 		this.taskStates = new HashMap<>();
+		this.masterState = new ArrayList<>();
 		this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
 		this.onCompletionPromise = new FlinkCompletableFuture<>();
 	}
@@ -256,7 +261,7 @@ public class PendingCheckpoint {
 			// make sure we fulfill the promise with an exception if something fails
 			try {
 				// externalize the metadata
-				final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
+				final Savepoint savepoint = new SavepointV2(checkpointId, taskStates.values());
 
 				// TEMP FIX - The savepoint store is strictly typed to file systems currently
 				//            but the checkpoints think more generic. we need to work with file handles
@@ -321,7 +326,8 @@ public class PendingCheckpoint {
 				checkpointId,
 				checkpointTimestamp,
 				System.currentTimeMillis(),
-				new HashMap<>(taskStates),
+				taskStates,
+				masterState,
 				props,
 				externalMetadata,
 				externalPointer);
@@ -345,6 +351,17 @@ public class PendingCheckpoint {
 	}
 
 	/**
+	 * Adds a master state (state generated on the checkpoint coordinator) to
+	 * the pending checkpoint.
+	 * 
+	 * @param state The state to add
+	 */
+	public void addMasterState(MasterState state) {
+		checkNotNull(state);
+		masterState.add(state);
+	}
+
+	/**
 	 * Acknowledges the task with the given execution attempt id and the given subtask state.
 	 *
 	 * @param executionAttemptId of the acknowledged task

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
new file mode 100644
index 0000000..409019e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Collection of methods to deal with checkpoint master hooks.
+ */
+public class MasterHooks {
+
+	// ------------------------------------------------------------------------
+	//  checkpoint triggering
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Triggers all given master hooks and returns state objects for each hook that
+	 * produced a state.
+	 * 
+	 * @param hooks The hooks to trigger
+	 * @param checkpointId The checkpoint ID of the triggering checkpoint
+	 * @param timestamp The (informational) timestamp for the triggering checkpoint 
+	 * @param executor An executor that can be used for asynchronous I/O calls
+	 * @param timeout The maximum time that a hook may take to complete
+	 * 
+	 * @return A list containing all states produced by the hooks
+	 * 
+	 * @throws FlinkException Thrown, if the hooks throw an exception, or the state+
+	 *                        deserialization fails.
+	 */
+	public static List<MasterState> triggerMasterHooks(
+			Collection<MasterTriggerRestoreHook<?>> hooks,
+			long checkpointId,
+			long timestamp,
+			Executor executor,
+			Time timeout) throws FlinkException {
+
+		final ArrayList<MasterState> states = new ArrayList<>(hooks.size());
+
+		for (MasterTriggerRestoreHook<?> hook : hooks) {
+			MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout);
+			if (state != null) {
+				states.add(state);
+			}
+		}
+
+		states.trimToSize();
+		return states;
+	}
+
+	private static <T> MasterState triggerHook(
+			MasterTriggerRestoreHook<?> hook,
+			long checkpointId,
+			long timestamp,
+			Executor executor,
+			Time timeout) throws FlinkException {
+
+		@SuppressWarnings("unchecked")
+		final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
+
+		final String id = typedHook.getIdentifier();
+		final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer();
+
+		// call the hook!
+		final Future<T> resultFuture;
+		try {
+			resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
+		}
+		catch (Throwable t) {
+			ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+			throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t);
+		}
+
+		// is there is a result future, wait for its completion
+		// in the future we want to make this asynchronous with futures (no pun intended)
+		if (resultFuture == null) {
+			return null;
+		}
+		else {
+			final T result;
+			try {
+				result = resultFuture.get(timeout.getSize(), timeout.getUnit());
+			}
+			catch (InterruptedException e) {
+				// cannot continue here - restore interrupt status and leave
+				Thread.currentThread().interrupt();
+				throw new FlinkException("Checkpoint master hook was interrupted");
+			}
+			catch (ExecutionException e) {
+				throw new FlinkException("Checkpoint master hook '" + id + "' produced an exception", e.getCause());
+			}
+			catch (TimeoutException e) {
+				throw new FlinkException("Checkpoint master hook '" + id +
+						"' did not complete in time (" + timeout + ')');
+			}
+
+			// if the result of the future is not null, return it as state
+			if (result == null) {
+				return null;
+			}
+			else if (serializer != null) {
+				try {
+					final int version = serializer.getVersion();
+					final byte[] bytes = serializer.serialize(result);
+
+					return new MasterState(id, bytes, version);
+				}
+				catch (Throwable t) {
+					ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+					throw new FlinkException("Failed to serialize state of master hook '" + id + '\'', t);
+				}
+			}
+			else {
+				throw new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer");
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  checkpoint restoring
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Calls the restore method given checkpoint master hooks and passes the given master
+	 * state to them where state with a matching name is found. 
+	 * 
+	 * <p>If state is found and no hook with the same name is found, the method throws an
+	 * exception, unless the {@code allowUnmatchedState} flag is set.
+	 *     
+	 * @param masterHooks The hooks to call restore on
+	 * @param states The state to pass to the hooks
+	 * @param checkpointId The checkpoint ID of the restored checkpoint
+	 * @param allowUnmatchedState True, 
+	 * @param log The logger for log messages
+	 * 
+	 * @throws FlinkException Thrown, if the hooks throw an exception, or the state+
+	 *                        deserialization fails.
+	 */
+	public static void restoreMasterHooks(
+			final Map<String, MasterTriggerRestoreHook<?>> masterHooks,
+			final Collection<MasterState> states,
+			final long checkpointId,
+			final boolean allowUnmatchedState,
+			final Logger log) throws FlinkException {
+
+		// early out
+		if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty()) {
+			log.info("No master state to restore");
+			return;
+		}
+
+		log.info("Calling master restore hooks");
+
+		// collect the hooks
+		final LinkedHashMap<String, MasterTriggerRestoreHook<?>> allHooks = new LinkedHashMap<>(masterHooks);
+
+		// first, deserialize all hook state
+		final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> hooksAndStates = new ArrayList<>();
+
+		for (MasterState state : states) {
+			if (state != null) {
+				final String name = state.name();
+				final MasterTriggerRestoreHook<?> hook = allHooks.remove(name);
+
+				if (hook != null) {
+					log.debug("Found state to restore for hook '{}'", name);
+
+					Object deserializedState = deserializeState(state, hook);
+					hooksAndStates.add(new Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState));
+				}
+				else if (!allowUnmatchedState) {
+					throw new IllegalStateException("Found state '" + state.name() +
+							"' which is not resumed by any hook.");
+				}
+				else {
+					log.info("Dropping unmatched state from '{}'", name);
+				}
+			}
+		}
+
+		// now that all is deserialized, call the hooks 
+		for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : hooksAndStates) {
+			restoreHook(hookAndState.f1, hookAndState.f0, checkpointId);
+		}
+
+		// trigger the remaining hooks without checkpointed state
+		for (MasterTriggerRestoreHook<?> hook : allHooks.values()) {
+			restoreHook(null, hook, checkpointId);
+		}
+	}
+
+	private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException {
+		@SuppressWarnings("unchecked")
+		final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
+		final String id = hook.getIdentifier();
+
+		try {
+			final SimpleVersionedSerializer<T> deserializer = typedHook.createCheckpointDataSerializer();
+			if (deserializer == null) {
+				throw new FlinkException("null serializer for state of hook " + hook.getIdentifier());
+			}
+
+			return deserializer.deserialize(state.version(), state.bytes());
+		}
+		catch (Throwable t) {
+			throw new FlinkException("Cannot deserialize state for master hook '" + id + '\'', t);
+		}
+	}
+
+	private static <T> void restoreHook(
+			final Object state,
+			final MasterTriggerRestoreHook<?> hook,
+			final long checkpointId) throws FlinkException {
+
+		@SuppressWarnings("unchecked")
+		final T typedState = (T) state;
+
+		@SuppressWarnings("unchecked")
+		final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
+
+		try {
+			typedHook.restoreCheckpoint(checkpointId, typedState);
+		}
+		catch (FlinkException e) {
+			throw e;
+		}
+		catch (Throwable t) {
+			// catch all here, including Errors that may come from dependency and classpath issues
+			ExceptionUtils.rethrowIfFatalError(t);
+			throw new FlinkException("Error while calling restoreCheckpoint on checkpoint hook '"
+					+ hook.getIdentifier() + '\'', t);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private MasterHooks() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index baad05f..79ec596 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.io.Versioned;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 
 import java.util.Collection;
@@ -58,6 +59,11 @@ public interface Savepoint extends Versioned {
 	Collection<TaskState> getTaskStates();
 
 	/**
+	 * Gets the checkpointed states generated by the master.
+	 */
+	Collection<MasterState> getMasterStates();
+
+	/**
 	 * Disposes the savepoint.
 	 */
 	void dispose() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 60f0287..8ee38da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,7 +123,7 @@ public class SavepointLoader {
 		// (3) convert to checkpoint so the system can fall back to it
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
 		return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L,
-				taskStates, props, metadataHandle, savepointPath);
+				taskStates, savepoint.getMasterStates(), props, metadataHandle, savepointPath);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 3155d60..c1fcf4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
 import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.util.Preconditions;
 
@@ -30,14 +31,16 @@ import java.util.Map;
 public class SavepointSerializers {
 
 
-	private static final int SAVEPOINT_VERSION_0 = 0;
 	private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
 
 	static {
-		SERIALIZERS.put(SAVEPOINT_VERSION_0, SavepointV0Serializer.INSTANCE);
+		SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE);
 		SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
+		SERIALIZERS.put(SavepointV2.VERSION, SavepointV2Serializer.INSTANCE);
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Returns the {@link SavepointSerializer} for the given savepoint.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
index 5976bbf..196c870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.util.Preconditions;
 
@@ -60,36 +61,21 @@ public class SavepointV1 implements Savepoint {
 	}
 
 	@Override
-	public void dispose() throws Exception {
-		for (TaskState taskState : taskStates) {
-			taskState.discardState();
-		}
-		taskStates.clear();
-	}
-
-	@Override
-	public String toString() {
-		return "Savepoint(version=" + VERSION + ")";
+	public Collection<MasterState> getMasterStates() {
+		// since checkpoints are never deserialized into this format,
+		// this method should never be called
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		SavepointV1 that = (SavepointV1) o;
-		return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+	public void dispose() throws Exception {
+		// since checkpoints are never deserialized into this format,
+		// this method should never be called
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public int hashCode() {
-		int result = (int) (checkpointId ^ (checkpointId >>> 32));
-		result = 31 * result + taskStates.hashCode();
-		return result;
+	public String toString() {
+		return "Savepoint(version=" + VERSION + ")";
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 44461d8..ae9f4a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,18 +38,19 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Serializer for {@link SavepointV1} instances.
- * <p>
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
+ * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format)
+ * 
+ * <p>In contrast to the previous versions, this serializer makes sure that no Java
+ * serialization is used for serialization. Therefore, we don't rely on any involved 
+ * classes to stay the same.
  */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 
 	private static final byte NULL_HANDLE = 0;
 	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -63,39 +65,12 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 	}
 
 	@Override
-	public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
-		try {
-			dos.writeLong(savepoint.getCheckpointId());
-
-			Collection<TaskState> taskStates = savepoint.getTaskStates();
-			dos.writeInt(taskStates.size());
-
-			for (TaskState taskState : savepoint.getTaskStates()) {
-				// Vertex ID
-				dos.writeLong(taskState.getJobVertexID().getLowerPart());
-				dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
-				// Parallelism
-				int parallelism = taskState.getParallelism();
-				dos.writeInt(parallelism);
-				dos.writeInt(taskState.getMaxParallelism());
-				dos.writeInt(taskState.getChainLength());
-
-				// Sub task states
-				Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
-				dos.writeInt(subtaskStateMap.size());
-				for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
-					dos.writeInt(entry.getKey());
-					serializeSubtaskState(entry.getValue(), dos);
-				}
-			}
-		} catch (Exception e) {
-			throw new IOException(e);
-		}
+	public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException {
+		throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
 	}
 
 	@Override
-	public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
+	public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
 		long checkpointId = dis.readLong();
 
 		// Task states
@@ -122,7 +97,34 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 			}
 		}
 
-		return new SavepointV1(checkpointId, taskStates);
+		return new SavepointV2(checkpointId, taskStates, Collections.<MasterState>emptyList());
+	}
+
+	public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+		dos.writeLong(savepoint.getCheckpointId());
+
+		Collection<TaskState> taskStates = savepoint.getTaskStates();
+		dos.writeInt(taskStates.size());
+
+		for (TaskState taskState : savepoint.getTaskStates()) {
+			// Vertex ID
+			dos.writeLong(taskState.getJobVertexID().getLowerPart());
+			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+			// Parallelism
+			int parallelism = taskState.getParallelism();
+			dos.writeInt(parallelism);
+			dos.writeInt(taskState.getMaxParallelism());
+			dos.writeInt(taskState.getChainLength());
+
+			// Sub task states
+			Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+			dos.writeInt(subtaskStateMap.size());
+			for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+				dos.writeInt(entry.getKey());
+				serializeSubtaskState(entry.getValue(), dos);
+			}
+		}
 	}
 
 	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
new file mode 100644
index 0000000..100982d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The persistent checkpoint metadata, format version 2.
+ * his format was introduced with Flink 1.3.0.
+ */
+public class SavepointV2 implements Savepoint {
+
+	/** The savepoint version. */
+	public static final int VERSION = 2;
+
+	/** The checkpoint ID */
+	private final long checkpointId;
+
+	/** The task states */
+	private final Collection<TaskState> taskStates;
+
+	/** The states generated by the CheckpointCoordinator */
+	private final Collection<MasterState> masterStates;
+
+
+	public SavepointV2(long checkpointId, Collection<TaskState> taskStates) {
+		this(checkpointId, taskStates, Collections.<MasterState>emptyList());
+	}
+
+	public SavepointV2(long checkpointId, Collection<TaskState> taskStates, Collection<MasterState> masterStates) {
+		this.checkpointId = checkpointId;
+		this.taskStates = checkNotNull(taskStates, "taskStates");
+		this.masterStates = checkNotNull(masterStates, "masterStates");
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	@Override
+	public Collection<TaskState> getTaskStates() {
+		return taskStates;
+	}
+
+	@Override
+	public Collection<MasterState> getMasterStates() {
+		return masterStates;
+	}
+
+	@Override
+	public void dispose() throws Exception {
+		for (TaskState taskState : taskStates) {
+			taskState.discardState();
+		}
+		taskStates.clear();
+		masterStates.clear();
+	}
+
+	@Override
+	public String toString() {
+		return "Checkpoint Metadata (version=" + VERSION + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
new file mode 100644
index 0000000..307ea16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * (De)serializer for checkpoint metadata format version 2.
+ * 
+ * <p>This format version adds
+ * 
+ * <p>Basic checkpoint metadata layout:
+ * <pre>
+ *  +--------------+---------------+-----------------+
+ *  | checkpointID | master states | operator states |
+ *  +--------------+---------------+-----------------+
+ *  
+ *  Master state:
+ *  +--------------+---------------------+---------+------+---------------+
+ *  | magic number | num remaining bytes | version | name | payload bytes |
+ *  +--------------+---------------------+---------+------+---------------+
+ * </pre>
+ */
+class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
+
+	/** Random magic number for consistency checks */
+	private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
+
+	private static final byte NULL_HANDLE = 0;
+	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
+	private static final byte FILE_STREAM_STATE_HANDLE = 2;
+	private static final byte KEY_GROUPS_HANDLE = 3;
+	private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
+
+	/** The singleton instance of the serializer */
+	public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
+
+	// ------------------------------------------------------------------------
+
+	/** Singleton, not meant to be instantiated */
+	private SavepointV2Serializer() {}
+
+	// ------------------------------------------------------------------------
+	//  (De)serialization entry points
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
+		// first: checkpoint ID
+		dos.writeLong(checkpointMetadata.getCheckpointId());
+
+		// second: master state
+		final Collection<MasterState> masterStates = checkpointMetadata.getMasterStates();
+		dos.writeInt(masterStates.size());
+		for (MasterState ms : masterStates) {
+			serializeMasterState(ms, dos);
+		}
+
+		// third: task states
+		final Collection<TaskState> taskStates = checkpointMetadata.getTaskStates();
+		dos.writeInt(taskStates.size());
+
+		for (TaskState taskState : checkpointMetadata.getTaskStates()) {
+			// Vertex ID
+			dos.writeLong(taskState.getJobVertexID().getLowerPart());
+			dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+			// Parallelism
+			int parallelism = taskState.getParallelism();
+			dos.writeInt(parallelism);
+			dos.writeInt(taskState.getMaxParallelism());
+			dos.writeInt(taskState.getChainLength());
+
+			// Sub task states
+			Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+			dos.writeInt(subtaskStateMap.size());
+			for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+				dos.writeInt(entry.getKey());
+				serializeSubtaskState(entry.getValue(), dos);
+			}
+		}
+	}
+
+	@Override
+	public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
+		// first: checkpoint ID
+		final long checkpointId = dis.readLong();
+		if (checkpointId < 0) {
+			throw new IOException("invalid checkpoint ID: " + checkpointId);
+		}
+
+		// second: master state
+		final List<MasterState> masterStates;
+		final int numMasterStates = dis.readInt();
+
+		if (numMasterStates == 0) {
+			masterStates = Collections.emptyList();
+		}
+		else if (numMasterStates > 0) {
+			masterStates = new ArrayList<>(numMasterStates);
+			for (int i = 0; i < numMasterStates; i++) {
+				masterStates.add(deserializeMasterState(dis));
+			}
+		}
+		else {
+			throw new IOException("invalid number of master states: " + numMasterStates);
+		}
+
+		// third: task states
+		final int numTaskStates = dis.readInt();
+		final ArrayList<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int i = 0; i < numTaskStates; i++) {
+			JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+			int parallelism = dis.readInt();
+			int maxParallelism = dis.readInt();
+			int chainLength = dis.readInt();
+
+			// Add task state
+			TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
+			taskStates.add(taskState);
+
+			// Sub task states
+			int numSubTaskStates = dis.readInt();
+
+			for (int j = 0; j < numSubTaskStates; j++) {
+				int subtaskIndex = dis.readInt();
+				SubtaskState subtaskState = deserializeSubtaskState(dis);
+				taskState.putState(subtaskIndex, subtaskState);
+			}
+		}
+
+		return new SavepointV2(checkpointId, taskStates, masterStates);
+	}
+
+	// ------------------------------------------------------------------------
+	//  master state (de)serialization methods
+	// ------------------------------------------------------------------------
+
+	private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
+		// magic number for error detection
+		dos.writeInt(MASTER_STATE_MAGIC_NUMBER);
+
+		// for safety, we serialize first into an array and then write the array and its
+		// length into the checkpoint
+		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		final DataOutputStream out = new DataOutputStream(baos);
+
+		out.writeInt(state.version());
+		out.writeUTF(state.name());
+
+		final byte[] bytes = state.bytes();
+		out.writeInt(bytes.length);
+		out.write(bytes, 0, bytes.length);
+
+		out.close();
+		byte[] data = baos.toByteArray();
+
+		dos.writeInt(data.length);
+		dos.write(data, 0, data.length);
+	}
+
+	private MasterState deserializeMasterState(DataInputStream dis) throws IOException {
+		final int magicNumber = dis.readInt();
+		if (magicNumber != MASTER_STATE_MAGIC_NUMBER) {
+			throw new IOException("incorrect magic number in master styte byte sequence");
+		}
+
+		final int numBytes = dis.readInt();
+		if (numBytes <= 0) {
+			throw new IOException("found zero or negative length for master state bytes");
+		}
+
+		final byte[] data = new byte[numBytes];
+		dis.readFully(data);
+
+		final DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+
+		final int version = in.readInt();
+		final String name = in.readUTF();
+
+		final byte[] bytes = new byte[in.readInt()];
+		in.readFully(bytes);
+
+		// check that the data is not corrupt
+		if (in.read() != -1) {
+			throw new IOException("found trailing bytes in master state");
+		}
+
+		return new MasterState(name, bytes, version);
+	}
+
+	// ------------------------------------------------------------------------
+	//  task state (de)serialization methods
+	// ------------------------------------------------------------------------
+
+	private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+
+		dos.writeLong(-1);
+
+		ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState();
+
+		int len = nonPartitionableState != null ? nonPartitionableState.getLength() : 0;
+		dos.writeInt(len);
+		for (int i = 0; i < len; ++i) {
+			StreamStateHandle stateHandle = nonPartitionableState.get(i);
+			serializeStreamStateHandle(stateHandle, dos);
+		}
+
+		ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
+
+		len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
+		dos.writeInt(len);
+		for (int i = 0; i < len; ++i) {
+			OperatorStateHandle stateHandle = operatorStateBackend.get(i);
+			serializeOperatorStateHandle(stateHandle, dos);
+		}
+
+		ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
+
+		len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
+		dos.writeInt(len);
+		for (int i = 0; i < len; ++i) {
+			OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
+			serializeOperatorStateHandle(stateHandle, dos);
+		}
+
+		KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+		serializeKeyedStateHandle(keyedStateBackend, dos);
+
+		KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+		serializeKeyedStateHandle(keyedStateStream, dos);
+	}
+
+	private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+		// Duration field has been removed from SubtaskState
+		long ignoredDuration = dis.readLong();
+
+		int len = dis.readInt();
+		List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len);
+		for (int i = 0; i < len; ++i) {
+			StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis);
+			nonPartitionableState.add(streamStateHandle);
+		}
+
+
+		len = dis.readInt();
+		List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len);
+		for (int i = 0; i < len; ++i) {
+			OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
+			operatorStateBackend.add(streamStateHandle);
+		}
+
+		len = dis.readInt();
+		List<OperatorStateHandle> operatorStateStream = new ArrayList<>(len);
+		for (int i = 0; i < len; ++i) {
+			OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
+			operatorStateStream.add(streamStateHandle);
+		}
+
+		KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis);
+
+		KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis);
+
+		ChainedStateHandle<StreamStateHandle> nonPartitionableStateChain =
+				new ChainedStateHandle<>(nonPartitionableState);
+
+		ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain =
+				new ChainedStateHandle<>(operatorStateBackend);
+
+		ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain =
+				new ChainedStateHandle<>(operatorStateStream);
+
+		return new SubtaskState(
+				nonPartitionableStateChain,
+				operatorStateBackendChain,
+				operatorStateStreamChain,
+				keyedStateBackend,
+				keyedStateStream);
+	}
+
+	private static void serializeKeyedStateHandle(
+			KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle == null) {
+			dos.writeByte(NULL_HANDLE);
+		} else if (stateHandle instanceof KeyGroupsStateHandle) {
+			KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
+
+			dos.writeByte(KEY_GROUPS_HANDLE);
+			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
+			dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+			for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
+				dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
+			}
+			serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+		} else {
+			throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
+		}
+	}
+
+	private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+		final int type = dis.readByte();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else if (KEY_GROUPS_HANDLE == type) {
+			int startKeyGroup = dis.readInt();
+			int numKeyGroups = dis.readInt();
+			KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+			long[] offsets = new long[numKeyGroups];
+			for (int i = 0; i < numKeyGroups; ++i) {
+				offsets[i] = dis.readLong();
+			}
+			KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
+				keyGroupRange, offsets);
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+		} else {
+			throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
+		}
+	}
+
+	private static void serializeOperatorStateHandle(
+			OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle != null) {
+			dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+			Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
+					stateHandle.getStateNameToPartitionOffsets();
+			dos.writeInt(partitionOffsetsMap.size());
+			for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
+				dos.writeUTF(entry.getKey());
+
+				OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
+
+				int mode = stateMetaInfo.getDistributionMode().ordinal();
+				dos.writeByte(mode);
+
+				long[] offsets = stateMetaInfo.getOffsets();
+				dos.writeInt(offsets.length);
+				for (long offset : offsets) {
+					dos.writeLong(offset);
+				}
+			}
+			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+		} else {
+			dos.writeByte(NULL_HANDLE);
+		}
+	}
+
+	private static OperatorStateHandle deserializeOperatorStateHandle(
+			DataInputStream dis) throws IOException {
+
+		final int type = dis.readByte();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+			int mapSize = dis.readInt();
+			Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(mapSize);
+			for (int i = 0; i < mapSize; ++i) {
+				String key = dis.readUTF();
+
+				int modeOrdinal = dis.readByte();
+				OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
+
+				long[] offsets = new long[dis.readInt()];
+				for (int j = 0; j < offsets.length; ++j) {
+					offsets[j] = dis.readLong();
+				}
+
+				OperatorStateHandle.StateMetaInfo metaInfo =
+						new OperatorStateHandle.StateMetaInfo(offsets, mode);
+				offsetsMap.put(key, metaInfo);
+			}
+			StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+			return new OperatorStateHandle(offsetsMap, stateHandle);
+		} else {
+			throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
+		}
+	}
+
+	private static void serializeStreamStateHandle(
+			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+		if (stateHandle == null) {
+			dos.writeByte(NULL_HANDLE);
+
+		} else if (stateHandle instanceof FileStateHandle) {
+			dos.writeByte(FILE_STREAM_STATE_HANDLE);
+			FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
+			dos.writeLong(stateHandle.getStateSize());
+			dos.writeUTF(fileStateHandle.getFilePath().toString());
+
+		} else if (stateHandle instanceof ByteStreamStateHandle) {
+			dos.writeByte(BYTE_STREAM_STATE_HANDLE);
+			ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
+			dos.writeUTF(byteStreamStateHandle.getHandleName());
+			byte[] internalData = byteStreamStateHandle.getData();
+			dos.writeInt(internalData.length);
+			dos.write(byteStreamStateHandle.getData());
+
+		} else {
+			throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
+		}
+
+		dos.flush();
+	}
+
+	private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+		final int type = dis.read();
+		if (NULL_HANDLE == type) {
+			return null;
+		} else if (FILE_STREAM_STATE_HANDLE == type) {
+			long size = dis.readLong();
+			String pathString = dis.readUTF();
+			return new FileStateHandle(new Path(pathString), size);
+		} else if (BYTE_STREAM_STATE_HANDLE == type) {
+			String handleName = dis.readUTF();
+			int numBytes = dis.readInt();
+			byte[] data = new byte[numBytes];
+			dis.readFully(data);
+			return new ByteStreamStateHandle(handleName, data);
+		} else {
+			throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 29b9806..23ed99d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -360,6 +361,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
+			List<MasterTriggerRestoreHook<?>> masterHooks,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			String checkpointDir,
@@ -395,6 +397,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			checkpointDir,
 			ioExecutor);
 
+		// register the master hooks on the checkpoint coordinator
+		for (MasterTriggerRestoreHook<?> hook : masterHooks) {
+			if (!checkpointCoordinator.addMasterHook(hook)) {
+				LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
+			}
+		}
+
 		checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
 
 		// interval of max long value indicates disable periodic checkpoint,

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a10c62e..b40817f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
@@ -51,6 +52,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -230,6 +232,21 @@ public class ExecutionGraphBuilder {
 				}
 			}
 
+			// instantiate the user-defined checkpoint hooks
+
+			final MasterTriggerRestoreHook.Factory[] hookFactories = snapshotSettings.getMasterHooks();
+			final List<MasterTriggerRestoreHook<?>> hooks;
+
+			if (hookFactories == null || hookFactories.length == 0) {
+				hooks = Collections.emptyList();
+			}
+			else {
+				hooks = new ArrayList<>(hookFactories.length);
+				for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
+					hooks.add(factory.create());
+				}
+			}
+
 			executionGraph.enableCheckpointing(
 					snapshotSettings.getCheckpointInterval(),
 					snapshotSettings.getCheckpointTimeout(),
@@ -239,6 +256,7 @@ public class ExecutionGraphBuilder {
 					triggerVertices,
 					ackVertices,
 					confirmVertices,
+					hooks,
 					checkpointIdCounter,
 					completedCheckpoints,
 					externalizedCheckpointsDir,