You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/04/26 21:42:48 UTC
[3/3] flink git commit: [FLINK-6390] [checkpoints] Add API for
checkpoints that are triggered via external systems
[FLINK-6390] [checkpoints] Add API for checkpoints that are triggered via external systems
This includes
- A interface for hooks that are called by the checkpoint coordinator to trigger/restore a checkpoint
- A source extension that triggers the operator checkpoints and barrier injection on certain events
Because this changes the checkpoint metadata format, the commit introduces a new metadata format version.
This closes #3782
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90ca4381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90ca4381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90ca4381
Branch: refs/heads/master
Commit: 90ca438106e63c5032ee2ad27e54e9f573eac386
Parents: 6bdaf1e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 27 17:20:47 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 26 21:56:45 2017 +0200
----------------------------------------------------------------------
.../core/io/SimpleVersionedSerializer.java | 80 ++++
.../java/org/apache/flink/util/StringUtils.java | 7 +
.../checkpoint/savepoint/SavepointV0.java | 10 +
.../savepoint/SavepointV0Serializer.java | 12 +-
.../checkpoint/CheckpointCoordinator.java | 80 +++-
.../checkpoint/CheckpointDeclineReason.java | 4 +-
.../runtime/checkpoint/CompletedCheckpoint.java | 31 +-
.../flink/runtime/checkpoint/MasterState.java | 62 +++
.../checkpoint/MasterTriggerRestoreHook.java | 140 ++++++
.../runtime/checkpoint/PendingCheckpoint.java | 23 +-
.../runtime/checkpoint/hooks/MasterHooks.java | 273 +++++++++++
.../runtime/checkpoint/savepoint/Savepoint.java | 6 +
.../checkpoint/savepoint/SavepointLoader.java | 3 +-
.../savepoint/SavepointSerializers.java | 7 +-
.../checkpoint/savepoint/SavepointV1.java | 36 +-
.../savepoint/SavepointV1Serializer.java | 76 +--
.../checkpoint/savepoint/SavepointV2.java | 91 ++++
.../savepoint/SavepointV2Serializer.java | 468 +++++++++++++++++++
.../runtime/executiongraph/ExecutionGraph.java | 9 +
.../executiongraph/ExecutionGraphBuilder.java | 18 +
.../tasks/JobCheckpointingSettings.java | 48 +-
.../CheckpointCoordinatorMasterHooksTest.java | 421 +++++++++++++++++
.../CompletedCheckpointStoreTest.java | 2 +-
.../checkpoint/CompletedCheckpointTest.java | 24 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 1 +
.../savepoint/CheckpointTestUtils.java | 184 ++++++++
.../savepoint/SavepointLoaderTest.java | 2 +-
.../savepoint/SavepointStoreTest.java | 27 +-
.../savepoint/SavepointV1SerializerTest.java | 17 +-
.../checkpoint/savepoint/SavepointV1Test.java | 157 -------
.../savepoint/SavepointV2SerializerTest.java | 148 ++++++
.../checkpoint/savepoint/SavepointV2Test.java | 68 +++
.../ArchivedExecutionGraphTest.java | 2 +
.../api/checkpoint/ExternallyInducedSource.java | 75 +++
.../checkpoint/WithMasterCheckpointHook.java | 38 ++
.../FunctionMasterCheckpointHookFactory.java | 45 ++
.../api/graph/StreamingJobGraphGenerator.java | 28 +-
.../runtime/tasks/SourceStreamTask.java | 56 +++
.../WithMasterCheckpointHookConfigTest.java | 189 ++++++++
.../runtime/io/StreamRecordWriterTest.java | 5 -
.../SourceExternalCheckpointTriggerTest.java | 171 +++++++
.../runtime/tasks/StreamTaskTestHarness.java | 7 +-
.../test/checkpointing/SavepointITCase.java | 4 +-
43 files changed, 2874 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
new file mode 100644
index 0000000..6c061a5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * A simple serializer interface for versioned serialization.
+ *
+ * <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached
+ * to the serialized data. When the serializer evolves, the version can be used to identify
+ * with which prior version the data was serialized.
+ *
+ * <pre>{@code
+ * MyType someObject = ...;
+ * SimpleVersionedSerializer<MyType> serializer = ...;
+ *
+ * byte[] serializedData = serializer.serialize(someObject);
+ * int version = serializer.getVersion();
+ *
+ * MyType deserialized = serializer.deserialize(version, serializedData);
+ *
+ * byte[] someOldData = ...;
+ * int oldVersion = ...;
+ * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData);
+ *
+ * }</pre>
+ *
+ * @param <E> The data type serialized / deserialized by this serializer.
+ */
+public interface SimpleVersionedSerializer<E> extends Versioned {
+
+ /**
+ * Gets the version with which this serializer serializes.
+ *
+ * @return The version of the serialization schema.
+ */
+ @Override
+ int getVersion();
+
+ /**
+ * Serializes the given object. The serialization is assumed to correspond to the
+ * current serialization version (as returned by {@link #getVersion()}.
+ *
+ *
+ * @param obj The object to serialize.
+ * @return The serialized data (bytes).
+ *
+ * @throws IOException Thrown, if the serialization fails.
+ */
+ byte[] serialize(E obj) throws IOException;
+
+ /**
+ * De-serializes the given data (bytes) which was serialized with the scheme of the
+ * indicated version.
+ *
+ * @param version The version in which the data was serialized
+ * @param serialized The serialized data
+ * @return The deserialized object
+ *
+ * @throws IOException Thrown, if the deserialization fails.
+ */
+ E deserialize(int version, byte[] serialized) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index b84f602..abd6ba6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -309,6 +309,13 @@ public final class StringUtils {
}
}
+ /**
+ * Checks if the string is null, empty, or contains only whitespace characters.
+ * A whitespace character is defined via {@link Character#isWhitespace(char)}.
+ *
+ * @param str The string to check
+ * @return True, if the string is null or blank, false otherwise.
+ */
public static boolean isNullOrWhitespaceOnly(String str) {
if (str == null || str.length() == 0) {
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
index 1c51a69..f3ec1cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
@@ -19,6 +19,7 @@
package org.apache.flink.migration.runtime.checkpoint.savepoint;
import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.util.Preconditions;
@@ -58,6 +59,15 @@ public class SavepointV0 implements Savepoint {
@Override
public Collection<org.apache.flink.runtime.checkpoint.TaskState> getTaskStates() {
+ // since checkpoints are never deserialized into this format,
+ // this method should never be called
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<MasterState> getMasterStates() {
+ // since checkpoints are never deserialized into this format,
+ // this method should never be called
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 4739033..d285906 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -34,7 +34,7 @@ import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.migration.util.SerializedValue;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -68,7 +68,7 @@ import java.util.Map;
* don't rely on any involved Java classes to stay the same.
*/
@SuppressWarnings("deprecation")
-public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV2> {
public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
@@ -81,12 +81,12 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
@Override
- public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+ public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException {
throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
}
@Override
- public SavepointV1 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException {
+ public SavepointV2 deserialize(DataInputStream dis, ClassLoader userClassLoader) throws IOException {
long checkpointId = dis.readLong();
@@ -165,7 +165,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
return serializedValue;
}
- private SavepointV1 convertSavepoint(
+ private SavepointV2 convertSavepoint(
List<TaskState> taskStates,
ClassLoader userClassLoader,
long checkpointID) throws Exception {
@@ -176,7 +176,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
newTaskStates.add(convertTaskState(taskState, userClassLoader, checkpointID));
}
- return new SavepointV1(checkpointID, newTaskStates);
+ return new SavepointV2(checkpointID, newTaskStates);
}
private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 256321e..23a38d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.concurrent.ApplyFunction;
@@ -39,7 +41,10 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,6 +143,9 @@ public class CheckpointCoordinator {
/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
private final ScheduledThreadPoolExecutor timer;
+ /** The master checkpoint hooks executed by this checkpoint coordinator */
+ private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
+
/** Actor that receives status updates from the execution graph this coordinator works for */
private JobStatusListener jobStatusListener;
@@ -220,6 +228,7 @@ public class CheckpointCoordinator {
this.executor = checkNotNull(executor);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
+ this.masterHooks = new HashMap<>();
this.timer = new ScheduledThreadPoolExecutor(1,
new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
@@ -245,6 +254,45 @@ public class CheckpointCoordinator {
}
}
+ // --------------------------------------------------------------------------------------------
+ // Configuration
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Adds the given master hook to the checkpoint coordinator. This method does nothing, if
+ * the checkpoint coordinator already contained a hook with the same ID (as defined via
+ * {@link MasterTriggerRestoreHook#getIdentifier()}).
+ *
+ * @param hook The hook to add.
+ * @return True, if the hook was added, false if the checkpoint coordinator already
+ * contained a hook with the same ID.
+ */
+ public boolean addMasterHook(MasterTriggerRestoreHook<?> hook) {
+ checkNotNull(hook);
+
+ final String id = hook.getIdentifier();
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(id), "The hook has a null or empty id");
+
+ synchronized (lock) {
+ if (!masterHooks.containsKey(id)) {
+ masterHooks.put(id, hook);
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Gets the number of currently register master hooks.
+ */
+ public int getNumberOfRegisteredMasterHooks() {
+ synchronized (lock) {
+ return masterHooks.size();
+ }
+ }
+
/**
* Sets the checkpoint stats tracker.
*
@@ -492,6 +540,20 @@ public class CheckpointCoordinator {
checkpoint.setStatsCallback(callback);
}
+ // trigger the master hooks for the checkpoint
+ try {
+ List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
+ checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
+
+ for (MasterState s : masterStates) {
+ checkpoint.addMasterState(s);
+ }
+ }
+ catch (FlinkException e) {
+ checkpoint.abortError(e);
+ return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+ }
+
// schedule the timer that will clean up the expired checkpoints
final Runnable canceller = new Runnable() {
@Override
@@ -962,13 +1024,25 @@ public class CheckpointCoordinator {
LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+ // re-assign the task states
+
final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates();
StateAssignmentOperation stateAssignmentOperation =
new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState);
-
stateAssignmentOperation.assignStates();
+ // call master hooks for restore
+
+ MasterHooks.restoreMasterHooks(
+ masterHooks,
+ latest.getMasterHookStates(),
+ latest.getCheckpointID(),
+ allowNonRestoredState,
+ LOG);
+
+ // update metrics
+
if (statsTracker != null) {
long restoreTimestamp = System.currentTimeMillis();
RestoredCheckpointStats restored = new RestoredCheckpointStats(
@@ -1022,9 +1096,9 @@ public class CheckpointCoordinator {
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
// Accessors
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
public int getNumberOfPendingCheckpoints() {
return this.pendingCheckpoints.size();
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 60fe657..41c50cc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -36,7 +36,9 @@ public enum CheckpointDeclineReason {
NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently running."),
- EXCEPTION("An Exception occurred while triggering the checkpoint.");
+ EXCEPTION("An Exception occurred while triggering the checkpoint."),
+
+ EXPIRED("The checkpoint expired before triggering was complete");
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 79fc31f..bb49b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -27,11 +27,16 @@ import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -90,11 +95,14 @@ public class CompletedCheckpoint implements Serializable {
private final long duration;
/** States of the different task groups belonging to this checkpoint */
- private final Map<JobVertexID, TaskState> taskStates;
+ private final HashMap<JobVertexID, TaskState> taskStates;
/** Properties for this checkpoint. */
private final CheckpointProperties props;
+ /** States that were created by a hook on the master (in the checkpoint coordinator) */
+ private final Collection<MasterState> masterHookStates;
+
/** The state handle to the externalized meta data, if the metadata has been externalized */
@Nullable
private final StreamStateHandle externalizedMetadata;
@@ -118,6 +126,7 @@ public class CompletedCheckpoint implements Serializable {
Map<JobVertexID, TaskState> taskStates) {
this(job, checkpointID, timestamp, completionTimestamp, taskStates,
+ Collections.<MasterState>emptyList(),
CheckpointProperties.forStandardCheckpoint());
}
@@ -127,9 +136,11 @@ public class CompletedCheckpoint implements Serializable {
long timestamp,
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates,
+ @Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props) {
- this(job, checkpointID, timestamp, completionTimestamp, taskStates, props, null, null);
+ this(job, checkpointID, timestamp, completionTimestamp, taskStates,
+ masterHookStates, props, null, null);
}
public CompletedCheckpoint(
@@ -138,6 +149,7 @@ public class CompletedCheckpoint implements Serializable {
long timestamp,
long completionTimestamp,
Map<JobVertexID, TaskState> taskStates,
+ @Nullable Collection<MasterState> masterHookStates,
CheckpointProperties props,
@Nullable StreamStateHandle externalizedMetadata,
@Nullable String externalPointer) {
@@ -156,7 +168,14 @@ public class CompletedCheckpoint implements Serializable {
this.checkpointID = checkpointID;
this.timestamp = timestamp;
this.duration = completionTimestamp - timestamp;
- this.taskStates = checkNotNull(taskStates);
+
+ // we create copies here, to make sure we have no shared mutable
+ // data structure with the "outside world"
+ this.taskStates = new HashMap<>(checkNotNull(taskStates));
+ this.masterHookStates = masterHookStates == null || masterHookStates.isEmpty() ?
+ Collections.<MasterState>emptyList() :
+ new ArrayList<>(masterHookStates);
+
this.props = checkNotNull(props);
this.externalizedMetadata = externalizedMetadata;
this.externalPointer = externalPointer;
@@ -228,13 +247,17 @@ public class CompletedCheckpoint implements Serializable {
}
public Map<JobVertexID, TaskState> getTaskStates() {
- return taskStates;
+ return Collections.unmodifiableMap(taskStates);
}
public TaskState getTaskState(JobVertexID jobVertexID) {
return taskStates.get(jobVertexID);
}
+ public Collection<MasterState> getMasterHookStates() {
+ return Collections.unmodifiableCollection(masterHookStates);
+ }
+
public boolean isExternalized() {
return externalizedMetadata != null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
new file mode 100644
index 0000000..2d09fdb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple encapsulation of state generated by checkpoint coordinator.
+ */
+public class MasterState implements java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+ private final byte[] bytes;
+ private final int version;
+
+ public MasterState(String name, byte[] bytes, int version) {
+ this.name = checkNotNull(name);
+ this.bytes = checkNotNull(bytes);
+ this.version = version;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public String name() {
+ return name;
+ }
+
+ public byte[] bytes() {
+ return bytes;
+ }
+
+ public int version() {
+ return version;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "name: " + name + " ; version: " + version + " ; bytes: " + Arrays.toString(bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
new file mode 100644
index 0000000..e77ed57
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.concurrent.Future;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
+/**
+ * The interface for hooks that can be called by the checkpoint coordinator when triggering or
+ * restoring a checkpoint. Such a hook is useful for example when preparing external systems for
+ * taking or restoring checkpoints.
+ *
+ * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called when triggering a checkpoint)
+ * can return a result (via a future) that will be stored as part of the checkpoint metadata.
+ * When restoring a checkpoint, that stored result will be given to the {@link #restoreCheckpoint(long, Object)}
+ * method. The hook's {@link #getIdentifier() identifier} is used to map data to hook in the presence
+ * of multiple hooks, and when resuming a savepoint that was potentially created by a different job.
+ * The identifier has a similar role as for example the operator UID in the streaming API.
+ *
+ * <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow graph. It is attached
+ * to the job graph, which gets sent to the cluster for execution. To avoid having to make the hook
+ * itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}.
+ *
+ * @param <T> The type of the data produced by the hook and stored as part of the checkpoint metadata.
+ * If the hook never stores any data, this can be typed to {@code Void}.
+ */
+public interface MasterTriggerRestoreHook<T> {
+
+ /**
+ * Gets the identifier of this hook. The identifier is used to identify a specific hook in the
+ * presence of multiple hooks and to give it the correct checkpointed data upon checkpoint restoration.
+ *
+ * <p>The identifier should be unique between different hooks of a job, but deterministic/constant
+ * so that upon resuming a savepoint, the hook will get the correct data.
+ * For example, if the hook calls into another storage system and persists namespace/schema specific
+ * information, then the name of the storage system, together with the namespace/schema name could
+ * be an appropriate identifier.
+ *
+ * <p>When multiple hooks of the same name are created and attached to a job graph, only the first
+ * one is actually used. This can be exploited to deduplicate hooks that would do the same thing.
+ *
+ * @return The identifier of the hook.
+ */
+ String getIdentifier();
+
+ /**
+ * This method is called by the checkpoint coordinator prior when triggering a checkpoint, prior
+ * to sending the "trigger checkpoint" messages to the source tasks.
+ *
+ * <p>If the hook implementation wants to store data as part of the checkpoint, it may return
+ * that data via a future, otherwise it should return null. The data is stored as part of
+ * the checkpoint metadata under the hooks identifier (see {@link #getIdentifier()}).
+ *
+ * <p>If the action by this hook needs to be executed synchronously, then this method should
+ * directly execute the action synchronously and block until it is complete. The returned future
+ * (if any) would typically be a completed future.
+ *
+ * <p>If the action should be executed asynchronously and only needs to complete before the
+ * checkpoint is considered completed, then the method may use the given executor to execute the
+ * actual action and would signal its completion by completing the future. For hooks that do not
+ * need to store data, the future would be completed with null.
+ *
+ * @param checkpointId The ID (logical timestamp, monotonously increasing) of the checkpoint
+ * @param timestamp The wall clock timestamp when the checkpoint was triggered, for
+ * info/logging purposes.
+ * @param executor The executor for asynchronous actions
+ *
+ * @return Optionally, a future that signals when the hook has completed and that contains
+ * data to be stored with the checkpoint.
+ *
+ * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort.
+ */
+ @Nullable
+ Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
+
+ /**
+ * This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint.
+ * If the checkpoint did store data from this hook, that data will be passed to this method.
+ *
+ * @param checkpointId The The ID (logical timestamp) of the restored checkpoint
+ * @param checkpointData The data originally stored in the checkpoint by this hook, possibly null.
+ *
+ * @throws Exception Exceptions thrown while restoring the checkpoint will cause the restore
+ * operation to fail and to possibly fall back to another checkpoint.
+ */
+ void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception;
+
+ /**
+ * Creates a the serializer to (de)serializes the data stored by this hook. The serializer
+ * serializes the result of the Future returned by the {@link #triggerCheckpoint(long, long, Executor)}
+ * method, and deserializes the data stored in the checkpoint into the object passed to the
+ * {@link #restoreCheckpoint(long, Object)} method.
+ *
+ * <p>If the hook never returns any data to be stored, then this method may return null as the
+ * serializer.
+ *
+ * @return The serializer to (de)serializes the data stored by this hook
+ */
+ @Nullable
+ SimpleVersionedSerializer<T> createCheckpointDataSerializer();
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * A factory to instantiate a {@code MasterTriggerRestoreHook}.
+ *
+ * The hooks are defined when creating the streaming dataflow graph and are attached
+ * to the job graph, which gets sent to the cluster for execution. To avoid having to make
+ * the hook implementation serializable, a serializable hook factory is actually attached to the
+ * job graph instead of the hook implementation itself.
+ */
+ interface Factory extends java.io.Serializable {
+
+ /**
+ * Instantiates the {@code MasterTriggerRestoreHook}.
+ */
+ <V> MasterTriggerRestoreHook<V> create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 900331b..ce97edc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -41,8 +41,10 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@@ -89,6 +91,8 @@ public class PendingCheckpoint {
private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
+ private final List<MasterState> masterState;
+
/** Set of acknowledged tasks */
private final Set<ExecutionAttemptID> acknowledgedTasks;
@@ -143,6 +147,7 @@ public class PendingCheckpoint {
this.executor = Preconditions.checkNotNull(executor);
this.taskStates = new HashMap<>();
+ this.masterState = new ArrayList<>();
this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
this.onCompletionPromise = new FlinkCompletableFuture<>();
}
@@ -256,7 +261,7 @@ public class PendingCheckpoint {
// make sure we fulfill the promise with an exception if something fails
try {
// externalize the metadata
- final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
+ final Savepoint savepoint = new SavepointV2(checkpointId, taskStates.values());
// TEMP FIX - The savepoint store is strictly typed to file systems currently
// but the checkpoints think more generic. we need to work with file handles
@@ -321,7 +326,8 @@ public class PendingCheckpoint {
checkpointId,
checkpointTimestamp,
System.currentTimeMillis(),
- new HashMap<>(taskStates),
+ taskStates,
+ masterState,
props,
externalMetadata,
externalPointer);
@@ -345,6 +351,17 @@ public class PendingCheckpoint {
}
/**
+ * Adds a master state (state generated on the checkpoint coordinator) to
+ * the pending checkpoint.
+ *
+ * @param state The state to add
+ */
+ public void addMasterState(MasterState state) {
+ checkNotNull(state);
+ masterState.add(state);
+ }
+
+ /**
* Acknowledges the task with the given execution attempt id and the given subtask state.
*
* @param executionAttemptId of the acknowledged task
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
new file mode 100644
index 0000000..409019e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Collection of methods to deal with checkpoint master hooks.
+ */
+public class MasterHooks {
+
+ // ------------------------------------------------------------------------
+ // checkpoint triggering
+ // ------------------------------------------------------------------------
+
+ /**
+ * Triggers all given master hooks and returns state objects for each hook that
+ * produced a state.
+ *
+ * @param hooks The hooks to trigger
+ * @param checkpointId The checkpoint ID of the triggering checkpoint
+ * @param timestamp The (informational) timestamp for the triggering checkpoint
+ * @param executor An executor that can be used for asynchronous I/O calls
+ * @param timeout The maximum time that a hook may take to complete
+ *
+ * @return A list containing all states produced by the hooks
+ *
+ * @throws FlinkException Thrown, if the hooks throw an exception, or the state+
+ * deserialization fails.
+ */
+ public static List<MasterState> triggerMasterHooks(
+ Collection<MasterTriggerRestoreHook<?>> hooks,
+ long checkpointId,
+ long timestamp,
+ Executor executor,
+ Time timeout) throws FlinkException {
+
+ final ArrayList<MasterState> states = new ArrayList<>(hooks.size());
+
+ for (MasterTriggerRestoreHook<?> hook : hooks) {
+ MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout);
+ if (state != null) {
+ states.add(state);
+ }
+ }
+
+ states.trimToSize();
+ return states;
+ }
+
+ private static <T> MasterState triggerHook(
+ MasterTriggerRestoreHook<?> hook,
+ long checkpointId,
+ long timestamp,
+ Executor executor,
+ Time timeout) throws FlinkException {
+
+ @SuppressWarnings("unchecked")
+ final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
+
+ final String id = typedHook.getIdentifier();
+ final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer();
+
+ // call the hook!
+ final Future<T> resultFuture;
+ try {
+ resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t);
+ }
+
+ // is there is a result future, wait for its completion
+ // in the future we want to make this asynchronous with futures (no pun intended)
+ if (resultFuture == null) {
+ return null;
+ }
+ else {
+ final T result;
+ try {
+ result = resultFuture.get(timeout.getSize(), timeout.getUnit());
+ }
+ catch (InterruptedException e) {
+ // cannot continue here - restore interrupt status and leave
+ Thread.currentThread().interrupt();
+ throw new FlinkException("Checkpoint master hook was interrupted");
+ }
+ catch (ExecutionException e) {
+ throw new FlinkException("Checkpoint master hook '" + id + "' produced an exception", e.getCause());
+ }
+ catch (TimeoutException e) {
+ throw new FlinkException("Checkpoint master hook '" + id +
+ "' did not complete in time (" + timeout + ')');
+ }
+
+ // if the result of the future is not null, return it as state
+ if (result == null) {
+ return null;
+ }
+ else if (serializer != null) {
+ try {
+ final int version = serializer.getVersion();
+ final byte[] bytes = serializer.serialize(result);
+
+ return new MasterState(id, bytes, version);
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ throw new FlinkException("Failed to serialize state of master hook '" + id + '\'', t);
+ }
+ }
+ else {
+ throw new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer");
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // checkpoint restoring
+ // ------------------------------------------------------------------------
+
+ /**
+ * Calls the restore method given checkpoint master hooks and passes the given master
+ * state to them where state with a matching name is found.
+ *
+ * <p>If state is found and no hook with the same name is found, the method throws an
+ * exception, unless the {@code allowUnmatchedState} flag is set.
+ *
+ * @param masterHooks The hooks to call restore on
+ * @param states The state to pass to the hooks
+ * @param checkpointId The checkpoint ID of the restored checkpoint
+ * @param allowUnmatchedState True,
+ * @param log The logger for log messages
+ *
+ * @throws FlinkException Thrown, if the hooks throw an exception, or the state+
+ * deserialization fails.
+ */
+ public static void restoreMasterHooks(
+ final Map<String, MasterTriggerRestoreHook<?>> masterHooks,
+ final Collection<MasterState> states,
+ final long checkpointId,
+ final boolean allowUnmatchedState,
+ final Logger log) throws FlinkException {
+
+ // early out
+ if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty()) {
+ log.info("No master state to restore");
+ return;
+ }
+
+ log.info("Calling master restore hooks");
+
+ // collect the hooks
+ final LinkedHashMap<String, MasterTriggerRestoreHook<?>> allHooks = new LinkedHashMap<>(masterHooks);
+
+ // first, deserialize all hook state
+ final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> hooksAndStates = new ArrayList<>();
+
+ for (MasterState state : states) {
+ if (state != null) {
+ final String name = state.name();
+ final MasterTriggerRestoreHook<?> hook = allHooks.remove(name);
+
+ if (hook != null) {
+ log.debug("Found state to restore for hook '{}'", name);
+
+ Object deserializedState = deserializeState(state, hook);
+ hooksAndStates.add(new Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState));
+ }
+ else if (!allowUnmatchedState) {
+ throw new IllegalStateException("Found state '" + state.name() +
+ "' which is not resumed by any hook.");
+ }
+ else {
+ log.info("Dropping unmatched state from '{}'", name);
+ }
+ }
+ }
+
+ // now that all is deserialized, call the hooks
+ for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : hooksAndStates) {
+ restoreHook(hookAndState.f1, hookAndState.f0, checkpointId);
+ }
+
+ // trigger the remaining hooks without checkpointed state
+ for (MasterTriggerRestoreHook<?> hook : allHooks.values()) {
+ restoreHook(null, hook, checkpointId);
+ }
+ }
+
+ private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException {
+ @SuppressWarnings("unchecked")
+ final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
+ final String id = hook.getIdentifier();
+
+ try {
+ final SimpleVersionedSerializer<T> deserializer = typedHook.createCheckpointDataSerializer();
+ if (deserializer == null) {
+ throw new FlinkException("null serializer for state of hook " + hook.getIdentifier());
+ }
+
+ return deserializer.deserialize(state.version(), state.bytes());
+ }
+ catch (Throwable t) {
+ throw new FlinkException("Cannot deserialize state for master hook '" + id + '\'', t);
+ }
+ }
+
+ private static <T> void restoreHook(
+ final Object state,
+ final MasterTriggerRestoreHook<?> hook,
+ final long checkpointId) throws FlinkException {
+
+ @SuppressWarnings("unchecked")
+ final T typedState = (T) state;
+
+ @SuppressWarnings("unchecked")
+ final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
+
+ try {
+ typedHook.restoreCheckpoint(checkpointId, typedState);
+ }
+ catch (FlinkException e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ // catch all here, including Errors that may come from dependency and classpath issues
+ ExceptionUtils.rethrowIfFatalError(t);
+ throw new FlinkException("Error while calling restoreCheckpoint on checkpoint hook '"
+ + hook.getIdentifier() + '\'', t);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated */
+ private MasterHooks() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index baad05f..79ec596 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.core.io.Versioned;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.TaskState;
import java.util.Collection;
@@ -58,6 +59,11 @@ public interface Savepoint extends Versioned {
Collection<TaskState> getTaskStates();
/**
+ * Gets the checkpointed states generated by the master.
+ */
+ Collection<MasterState> getMasterStates();
+
+ /**
* Disposes the savepoint.
*/
void dispose() throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 60f0287..8ee38da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StreamStateHandle;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class SavepointLoader {
// (3) convert to checkpoint so the system can fall back to it
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L,
- taskStates, props, metadataHandle, savepointPath);
+ taskStates, savepoint.getMasterStates(), props, metadataHandle, savepointPath);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 3155d60..c1fcf4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
import org.apache.flink.util.Preconditions;
@@ -30,14 +31,16 @@ import java.util.Map;
public class SavepointSerializers {
- private static final int SAVEPOINT_VERSION_0 = 0;
private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2);
static {
- SERIALIZERS.put(SAVEPOINT_VERSION_0, SavepointV0Serializer.INSTANCE);
+ SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE);
SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
+ SERIALIZERS.put(SavepointV2.VERSION, SavepointV2Serializer.INSTANCE);
}
+ // ------------------------------------------------------------------------
+
/**
* Returns the {@link SavepointSerializer} for the given savepoint.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
index 5976bbf..196c870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.util.Preconditions;
@@ -60,36 +61,21 @@ public class SavepointV1 implements Savepoint {
}
@Override
- public void dispose() throws Exception {
- for (TaskState taskState : taskStates) {
- taskState.discardState();
- }
- taskStates.clear();
- }
-
- @Override
- public String toString() {
- return "Savepoint(version=" + VERSION + ")";
+ public Collection<MasterState> getMasterStates() {
+ // since checkpoints are never deserialized into this format,
+ // this method should never be called
+ throw new UnsupportedOperationException();
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SavepointV1 that = (SavepointV1) o;
- return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+ public void dispose() throws Exception {
+ // since checkpoints are never deserialized into this format,
+ // this method should never be called
+ throw new UnsupportedOperationException();
}
@Override
- public int hashCode() {
- int result = (int) (checkpointId ^ (checkpointId >>> 32));
- result = 31 * result + taskStates.hashCode();
- return result;
+ public String toString() {
+ return "Savepoint(version=" + VERSION + ")";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 44461d8..ae9f4a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint.savepoint;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,18 +38,19 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Serializer for {@link SavepointV1} instances.
- * <p>
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
+ * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format)
+ *
+ * <p>In contrast to the previous versions, this serializer makes sure that no Java
+ * serialization is used for serialization. Therefore, we don't rely on any involved
+ * classes to stay the same.
*/
-class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
private static final byte NULL_HANDLE = 0;
private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -63,39 +65,12 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
@Override
- public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
- try {
- dos.writeLong(savepoint.getCheckpointId());
-
- Collection<TaskState> taskStates = savepoint.getTaskStates();
- dos.writeInt(taskStates.size());
-
- for (TaskState taskState : savepoint.getTaskStates()) {
- // Vertex ID
- dos.writeLong(taskState.getJobVertexID().getLowerPart());
- dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
- // Parallelism
- int parallelism = taskState.getParallelism();
- dos.writeInt(parallelism);
- dos.writeInt(taskState.getMaxParallelism());
- dos.writeInt(taskState.getChainLength());
-
- // Sub task states
- Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
- dos.writeInt(subtaskStateMap.size());
- for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
- dos.writeInt(entry.getKey());
- serializeSubtaskState(entry.getValue(), dos);
- }
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
+ public void serialize(SavepointV2 savepoint, DataOutputStream dos) throws IOException {
+ throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
}
@Override
- public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
+ public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
long checkpointId = dis.readLong();
// Task states
@@ -122,7 +97,34 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- return new SavepointV1(checkpointId, taskStates);
+ return new SavepointV2(checkpointId, taskStates, Collections.<MasterState>emptyList());
+ }
+
+ public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+ dos.writeLong(savepoint.getCheckpointId());
+
+ Collection<TaskState> taskStates = savepoint.getTaskStates();
+ dos.writeInt(taskStates.size());
+
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ // Vertex ID
+ dos.writeLong(taskState.getJobVertexID().getLowerPart());
+ dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+ // Parallelism
+ int parallelism = taskState.getParallelism();
+ dos.writeInt(parallelism);
+ dos.writeInt(taskState.getMaxParallelism());
+ dos.writeInt(taskState.getChainLength());
+
+ // Sub task states
+ Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+ dos.writeInt(subtaskStateMap.size());
+ for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+ dos.writeInt(entry.getKey());
+ serializeSubtaskState(entry.getValue(), dos);
+ }
+ }
}
private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
new file mode 100644
index 0000000..100982d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The persistent checkpoint metadata, format version 2.
+ * his format was introduced with Flink 1.3.0.
+ */
+public class SavepointV2 implements Savepoint {
+
+ /** The savepoint version. */
+ public static final int VERSION = 2;
+
+ /** The checkpoint ID */
+ private final long checkpointId;
+
+ /** The task states */
+ private final Collection<TaskState> taskStates;
+
+ /** The states generated by the CheckpointCoordinator */
+ private final Collection<MasterState> masterStates;
+
+
+ public SavepointV2(long checkpointId, Collection<TaskState> taskStates) {
+ this(checkpointId, taskStates, Collections.<MasterState>emptyList());
+ }
+
+ public SavepointV2(long checkpointId, Collection<TaskState> taskStates, Collection<MasterState> masterStates) {
+ this.checkpointId = checkpointId;
+ this.taskStates = checkNotNull(taskStates, "taskStates");
+ this.masterStates = checkNotNull(masterStates, "masterStates");
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ @Override
+ public Collection<TaskState> getTaskStates() {
+ return taskStates;
+ }
+
+ @Override
+ public Collection<MasterState> getMasterStates() {
+ return masterStates;
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ for (TaskState taskState : taskStates) {
+ taskState.discardState();
+ }
+ taskStates.clear();
+ masterStates.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "Checkpoint Metadata (version=" + VERSION + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
new file mode 100644
index 0000000..307ea16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * (De)serializer for checkpoint metadata format version 2.
+ *
+ * <p>This format version adds
+ *
+ * <p>Basic checkpoint metadata layout:
+ * <pre>
+ * +--------------+---------------+-----------------+
+ * | checkpointID | master states | operator states |
+ * +--------------+---------------+-----------------+
+ *
+ * Master state:
+ * +--------------+---------------------+---------+------+---------------+
+ * | magic number | num remaining bytes | version | name | payload bytes |
+ * +--------------+---------------------+---------+------+---------------+
+ * </pre>
+ */
+class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
+
+ /** Random magic number for consistency checks */
+ private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
+
+ private static final byte NULL_HANDLE = 0;
+ private static final byte BYTE_STREAM_STATE_HANDLE = 1;
+ private static final byte FILE_STREAM_STATE_HANDLE = 2;
+ private static final byte KEY_GROUPS_HANDLE = 3;
+ private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
+
+ /** The singleton instance of the serializer */
+ public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
+
+ // ------------------------------------------------------------------------
+
+ /** Singleton, not meant to be instantiated */
+ private SavepointV2Serializer() {}
+
+ // ------------------------------------------------------------------------
+ // (De)serialization entry points
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void serialize(SavepointV2 checkpointMetadata, DataOutputStream dos) throws IOException {
+ // first: checkpoint ID
+ dos.writeLong(checkpointMetadata.getCheckpointId());
+
+ // second: master state
+ final Collection<MasterState> masterStates = checkpointMetadata.getMasterStates();
+ dos.writeInt(masterStates.size());
+ for (MasterState ms : masterStates) {
+ serializeMasterState(ms, dos);
+ }
+
+ // third: task states
+ final Collection<TaskState> taskStates = checkpointMetadata.getTaskStates();
+ dos.writeInt(taskStates.size());
+
+ for (TaskState taskState : checkpointMetadata.getTaskStates()) {
+ // Vertex ID
+ dos.writeLong(taskState.getJobVertexID().getLowerPart());
+ dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+ // Parallelism
+ int parallelism = taskState.getParallelism();
+ dos.writeInt(parallelism);
+ dos.writeInt(taskState.getMaxParallelism());
+ dos.writeInt(taskState.getChainLength());
+
+ // Sub task states
+ Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+ dos.writeInt(subtaskStateMap.size());
+ for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+ dos.writeInt(entry.getKey());
+ serializeSubtaskState(entry.getValue(), dos);
+ }
+ }
+ }
+
+ @Override
+ public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
+ // first: checkpoint ID
+ final long checkpointId = dis.readLong();
+ if (checkpointId < 0) {
+ throw new IOException("invalid checkpoint ID: " + checkpointId);
+ }
+
+ // second: master state
+ final List<MasterState> masterStates;
+ final int numMasterStates = dis.readInt();
+
+ if (numMasterStates == 0) {
+ masterStates = Collections.emptyList();
+ }
+ else if (numMasterStates > 0) {
+ masterStates = new ArrayList<>(numMasterStates);
+ for (int i = 0; i < numMasterStates; i++) {
+ masterStates.add(deserializeMasterState(dis));
+ }
+ }
+ else {
+ throw new IOException("invalid number of master states: " + numMasterStates);
+ }
+
+ // third: task states
+ final int numTaskStates = dis.readInt();
+ final ArrayList<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+ int parallelism = dis.readInt();
+ int maxParallelism = dis.readInt();
+ int chainLength = dis.readInt();
+
+ // Add task state
+ TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
+ taskStates.add(taskState);
+
+ // Sub task states
+ int numSubTaskStates = dis.readInt();
+
+ for (int j = 0; j < numSubTaskStates; j++) {
+ int subtaskIndex = dis.readInt();
+ SubtaskState subtaskState = deserializeSubtaskState(dis);
+ taskState.putState(subtaskIndex, subtaskState);
+ }
+ }
+
+ return new SavepointV2(checkpointId, taskStates, masterStates);
+ }
+
+ // ------------------------------------------------------------------------
+ // master state (de)serialization methods
+ // ------------------------------------------------------------------------
+
+ private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
+ // magic number for error detection
+ dos.writeInt(MASTER_STATE_MAGIC_NUMBER);
+
+ // for safety, we serialize first into an array and then write the array and its
+ // length into the checkpoint
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos);
+
+ out.writeInt(state.version());
+ out.writeUTF(state.name());
+
+ final byte[] bytes = state.bytes();
+ out.writeInt(bytes.length);
+ out.write(bytes, 0, bytes.length);
+
+ out.close();
+ byte[] data = baos.toByteArray();
+
+ dos.writeInt(data.length);
+ dos.write(data, 0, data.length);
+ }
+
+ private MasterState deserializeMasterState(DataInputStream dis) throws IOException {
+ final int magicNumber = dis.readInt();
+ if (magicNumber != MASTER_STATE_MAGIC_NUMBER) {
+ throw new IOException("incorrect magic number in master styte byte sequence");
+ }
+
+ final int numBytes = dis.readInt();
+ if (numBytes <= 0) {
+ throw new IOException("found zero or negative length for master state bytes");
+ }
+
+ final byte[] data = new byte[numBytes];
+ dis.readFully(data);
+
+ final DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+
+ final int version = in.readInt();
+ final String name = in.readUTF();
+
+ final byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+
+ // check that the data is not corrupt
+ if (in.read() != -1) {
+ throw new IOException("found trailing bytes in master state");
+ }
+
+ return new MasterState(name, bytes, version);
+ }
+
+ // ------------------------------------------------------------------------
+ // task state (de)serialization methods
+ // ------------------------------------------------------------------------
+
+ private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
+
+ dos.writeLong(-1);
+
+ ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState();
+
+ int len = nonPartitionableState != null ? nonPartitionableState.getLength() : 0;
+ dos.writeInt(len);
+ for (int i = 0; i < len; ++i) {
+ StreamStateHandle stateHandle = nonPartitionableState.get(i);
+ serializeStreamStateHandle(stateHandle, dos);
+ }
+
+ ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
+
+ len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
+ dos.writeInt(len);
+ for (int i = 0; i < len; ++i) {
+ OperatorStateHandle stateHandle = operatorStateBackend.get(i);
+ serializeOperatorStateHandle(stateHandle, dos);
+ }
+
+ ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
+
+ len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
+ dos.writeInt(len);
+ for (int i = 0; i < len; ++i) {
+ OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
+ serializeOperatorStateHandle(stateHandle, dos);
+ }
+
+ KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+ serializeKeyedStateHandle(keyedStateBackend, dos);
+
+ KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+ serializeKeyedStateHandle(keyedStateStream, dos);
+ }
+
+ private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
+ // Duration field has been removed from SubtaskState
+ long ignoredDuration = dis.readLong();
+
+ int len = dis.readInt();
+ List<StreamStateHandle> nonPartitionableState = new ArrayList<>(len);
+ for (int i = 0; i < len; ++i) {
+ StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis);
+ nonPartitionableState.add(streamStateHandle);
+ }
+
+
+ len = dis.readInt();
+ List<OperatorStateHandle> operatorStateBackend = new ArrayList<>(len);
+ for (int i = 0; i < len; ++i) {
+ OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
+ operatorStateBackend.add(streamStateHandle);
+ }
+
+ len = dis.readInt();
+ List<OperatorStateHandle> operatorStateStream = new ArrayList<>(len);
+ for (int i = 0; i < len; ++i) {
+ OperatorStateHandle streamStateHandle = deserializeOperatorStateHandle(dis);
+ operatorStateStream.add(streamStateHandle);
+ }
+
+ KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis);
+
+ KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis);
+
+ ChainedStateHandle<StreamStateHandle> nonPartitionableStateChain =
+ new ChainedStateHandle<>(nonPartitionableState);
+
+ ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain =
+ new ChainedStateHandle<>(operatorStateBackend);
+
+ ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain =
+ new ChainedStateHandle<>(operatorStateStream);
+
+ return new SubtaskState(
+ nonPartitionableStateChain,
+ operatorStateBackendChain,
+ operatorStateStreamChain,
+ keyedStateBackend,
+ keyedStateStream);
+ }
+
+ private static void serializeKeyedStateHandle(
+ KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle == null) {
+ dos.writeByte(NULL_HANDLE);
+ } else if (stateHandle instanceof KeyGroupsStateHandle) {
+ KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle;
+
+ dos.writeByte(KEY_GROUPS_HANDLE);
+ dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
+ dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+ for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
+ dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
+ }
+ serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+ } else {
+ throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
+ }
+ }
+
+ private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+ final int type = dis.readByte();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else if (KEY_GROUPS_HANDLE == type) {
+ int startKeyGroup = dis.readInt();
+ int numKeyGroups = dis.readInt();
+ KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+ long[] offsets = new long[numKeyGroups];
+ for (int i = 0; i < numKeyGroups; ++i) {
+ offsets[i] = dis.readLong();
+ }
+ KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(
+ keyGroupRange, offsets);
+ StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+ return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ } else {
+ throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
+ }
+ }
+
+ private static void serializeOperatorStateHandle(
+ OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle != null) {
+ dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+ Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap =
+ stateHandle.getStateNameToPartitionOffsets();
+ dos.writeInt(partitionOffsetsMap.size());
+ for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
+ dos.writeUTF(entry.getKey());
+
+ OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
+
+ int mode = stateMetaInfo.getDistributionMode().ordinal();
+ dos.writeByte(mode);
+
+ long[] offsets = stateMetaInfo.getOffsets();
+ dos.writeInt(offsets.length);
+ for (long offset : offsets) {
+ dos.writeLong(offset);
+ }
+ }
+ serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+ } else {
+ dos.writeByte(NULL_HANDLE);
+ }
+ }
+
+ private static OperatorStateHandle deserializeOperatorStateHandle(
+ DataInputStream dis) throws IOException {
+
+ final int type = dis.readByte();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+ int mapSize = dis.readInt();
+ Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; ++i) {
+ String key = dis.readUTF();
+
+ int modeOrdinal = dis.readByte();
+ OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
+
+ long[] offsets = new long[dis.readInt()];
+ for (int j = 0; j < offsets.length; ++j) {
+ offsets[j] = dis.readLong();
+ }
+
+ OperatorStateHandle.StateMetaInfo metaInfo =
+ new OperatorStateHandle.StateMetaInfo(offsets, mode);
+ offsetsMap.put(key, metaInfo);
+ }
+ StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+ return new OperatorStateHandle(offsetsMap, stateHandle);
+ } else {
+ throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
+ }
+ }
+
+ private static void serializeStreamStateHandle(
+ StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle == null) {
+ dos.writeByte(NULL_HANDLE);
+
+ } else if (stateHandle instanceof FileStateHandle) {
+ dos.writeByte(FILE_STREAM_STATE_HANDLE);
+ FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
+ dos.writeLong(stateHandle.getStateSize());
+ dos.writeUTF(fileStateHandle.getFilePath().toString());
+
+ } else if (stateHandle instanceof ByteStreamStateHandle) {
+ dos.writeByte(BYTE_STREAM_STATE_HANDLE);
+ ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
+ dos.writeUTF(byteStreamStateHandle.getHandleName());
+ byte[] internalData = byteStreamStateHandle.getData();
+ dos.writeInt(internalData.length);
+ dos.write(byteStreamStateHandle.getData());
+
+ } else {
+ throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
+ }
+
+ dos.flush();
+ }
+
+ private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ final int type = dis.read();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else if (FILE_STREAM_STATE_HANDLE == type) {
+ long size = dis.readLong();
+ String pathString = dis.readUTF();
+ return new FileStateHandle(new Path(pathString), size);
+ } else if (BYTE_STREAM_STATE_HANDLE == type) {
+ String handleName = dis.readUTF();
+ int numBytes = dis.readInt();
+ byte[] data = new byte[numBytes];
+ dis.readFully(data);
+ return new ByteStreamStateHandle(handleName, data);
+ } else {
+ throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 29b9806..23ed99d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -360,6 +361,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
+ List<MasterTriggerRestoreHook<?>> masterHooks,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
String checkpointDir,
@@ -395,6 +397,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
checkpointDir,
ioExecutor);
+ // register the master hooks on the checkpoint coordinator
+ for (MasterTriggerRestoreHook<?> hook : masterHooks) {
+ if (!checkpointCoordinator.addMasterHook(hook)) {
+ LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());
+ }
+ }
+
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
// interval of max long value indicates disable periodic checkpoint,
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a10c62e..b40817f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
@@ -51,6 +52,7 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -230,6 +232,21 @@ public class ExecutionGraphBuilder {
}
}
+ // instantiate the user-defined checkpoint hooks
+
+ final MasterTriggerRestoreHook.Factory[] hookFactories = snapshotSettings.getMasterHooks();
+ final List<MasterTriggerRestoreHook<?>> hooks;
+
+ if (hookFactories == null || hookFactories.length == 0) {
+ hooks = Collections.emptyList();
+ }
+ else {
+ hooks = new ArrayList<>(hookFactories.length);
+ for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
+ hooks.add(factory.create());
+ }
+ }
+
executionGraph.enableCheckpointing(
snapshotSettings.getCheckpointInterval(),
snapshotSettings.getCheckpointTimeout(),
@@ -239,6 +256,7 @@ public class ExecutionGraphBuilder {
triggerVertices,
ackVertices,
confirmVertices,
+ hooks,
checkpointIdCounter,
completedCheckpoints,
externalizedCheckpointsDir,