You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:33 UTC
[5/7] flink git commit: [FLINK-5763] [checkpoints] Add
CheckpointOptions
[FLINK-5763] [checkpoints] Add CheckpointOptions
Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
to barrier injecting tasks) and barriers (flowing inline with the data:
```java
public class CheckpointOptions {
// Type of checkpoint
// => FULL_CHECKPOINT
// => SAVEPOINT
@NonNull
CheckpointType getCheckpointType();
// Custom target location. This is a String, because for future
// backends it can be a logical location like a DB table.
@Nullable
String getTargetLocation();
}
```
This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints).
These options are forwarded via the `StreamTask` to the `StreamOperator`s and
`Snapshotable` backends. The `AbstractStreamOperator` checks the options and
either i) forwards the shared per operator `CheckpointStreamFactory` (as of
For this, the state backends provide the following new method:
```
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```
The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).
We end up with the following directory layout for savepoints:
```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`)
+---------------------------+
| +---------------------------------------+
+-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
+---------------------------------------+
|
+- _metadata (one per savepoint)
+- :uuid (one data file per StreamTask)
+- ...
+- :uuid
```
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e7a9174
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e7a9174
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e7a9174
Branch: refs/heads/master
Commit: 6e7a91741708a2b167a2bbca5dda5b2059df5e18
Parents: 1f9f38b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Feb 16 17:56:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100
----------------------------------------------------------------------
.../connectors/fs/RollingSinkITCase.java | 1 -
.../state/RocksDBKeyedStateBackend.java | 5 +-
.../streaming/state/RocksDBStateBackend.java | 9 ++
.../state/RocksDBAsyncSnapshotTest.java | 8 +-
.../state/RocksDBStateBackendTest.java | 15 +-
.../checkpoint/CheckpointCoordinator.java | 56 ++++++--
.../runtime/checkpoint/CheckpointOptions.java | 108 +++++++++++++++
.../runtime/checkpoint/CompletedCheckpoint.java | 2 +-
.../runtime/checkpoint/PendingCheckpoint.java | 3 +-
.../checkpoint/savepoint/SavepointStore.java | 137 +++++++++++++------
.../flink/runtime/executiongraph/Execution.java | 6 +-
.../io/network/api/CheckpointBarrier.java | 44 +++++-
.../api/serialization/EventSerializer.java | 59 +++++++-
.../runtime/jobgraph/tasks/StatefulTask.java | 7 +-
.../slots/ActorTaskManagerGateway.java | 6 +-
.../jobmanager/slots/TaskManagerGateway.java | 5 +-
.../jobmaster/RpcTaskManagerGateway.java | 3 +-
.../messages/checkpoint/TriggerCheckpoint.java | 19 ++-
.../state/AbstractKeyedStateBackend.java | 3 +-
.../runtime/state/AbstractStateBackend.java | 8 ++
.../state/DefaultOperatorStateBackend.java | 8 +-
.../flink/runtime/state/Snapshotable.java | 5 +-
.../flink/runtime/state/StateBackend.java | 22 +++
.../filesystem/FsCheckpointStreamFactory.java | 21 +--
.../filesystem/FsSavepointStreamFactory.java | 58 ++++++++
.../state/filesystem/FsStateBackend.java | 9 ++
.../state/heap/HeapKeyedStateBackend.java | 4 +-
.../state/memory/MemoryStateBackend.java | 9 ++
.../runtime/taskexecutor/TaskExecutor.java | 5 +-
.../taskexecutor/TaskExecutorGateway.java | 4 +-
.../apache/flink/runtime/taskmanager/Task.java | 10 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
.../flink/runtime/taskmanager/TaskManager.scala | 3 +-
.../checkpoint/CheckpointCoordinatorTest.java | 53 ++++---
.../checkpoint/CheckpointOptionsTest.java | 48 +++++++
.../checkpoint/CheckpointStatsHistoryTest.java | 1 +
.../savepoint/MigrationV0ToV1Test.java | 2 +-
.../savepoint/SavepointLoaderTest.java | 4 +-
.../savepoint/SavepointStoreTest.java | 48 +++++--
.../io/network/api/CheckpointBarrierTest.java | 61 +++++++++
.../api/serialization/EventSerializerTest.java | 45 ++++--
.../io/network/api/writer/RecordWriterTest.java | 5 +-
.../jobmanager/JobManagerHARecoveryTest.java | 5 +-
.../messages/CheckpointMessagesTest.java | 3 +-
.../runtime/state/OperatorStateBackendTest.java | 3 +-
.../runtime/state/StateBackendTestBase.java | 39 +++---
.../FsSavepointStreamFactoryTest.java | 67 +++++++++
.../runtime/taskmanager/TaskAsyncCallTest.java | 9 +-
.../api/operators/AbstractStreamOperator.java | 43 +++++-
.../api/operators/OperatorSnapshotResult.java | 2 +-
.../streaming/api/operators/StreamOperator.java | 12 +-
.../streaming/runtime/io/BarrierBuffer.java | 5 +-
.../streaming/runtime/io/BarrierTracker.java | 9 +-
.../streaming/runtime/tasks/OperatorChain.java | 5 +-
.../streaming/runtime/tasks/StreamTask.java | 65 +++++++--
.../api/checkpoint/ListCheckpointedTest.java | 2 +-
.../operators/AbstractStreamOperatorTest.java | 65 +++++----
.../AbstractUdfStreamOperatorLifecycleTest.java | 12 +-
.../WrappingFunctionSnapshotRestoreTest.java | 2 +-
.../operators/async/AsyncWaitOperatorTest.java | 5 +-
.../io/BarrierBufferAlignmentLimitTest.java | 13 +-
.../io/BarrierBufferMassiveRandomTest.java | 3 +-
.../streaming/runtime/io/BarrierBufferTest.java | 33 ++---
.../runtime/io/BarrierTrackerTest.java | 7 +-
.../runtime/tasks/BlockingCheckpointsTest.java | 8 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 31 +++--
.../runtime/tasks/SourceStreamTaskTest.java | 3 +-
.../StreamTaskCancellationBarrierTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 37 ++---
.../runtime/tasks/TwoInputStreamTaskTest.java | 29 ++--
.../util/AbstractStreamOperatorTestHarness.java | 10 +-
.../KeyedOneInputStreamOperatorTestHarness.java | 7 +-
.../test/checkpointing/SavepointITCase.java | 51 ++++---
.../streaming/runtime/StateBackendITCase.java | 7 +-
74 files changed, 1173 insertions(+), 354 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 80ae294..72f2f21 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -941,7 +941,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
}
-
private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
private String key;
private String expect;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a0efe78..bd8d4dd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @param checkpointId The Id of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
+ * @param checkpointOptions Options for how to perform this checkpoint.
* @return Future to the state handle of the snapshot data.
* @throws Exception
*/
@@ -251,7 +253,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public RunnableFuture<KeyGroupsStateHandle> snapshot(
final long checkpointId,
final long timestamp,
- final CheckpointStreamFactory streamFactory) throws Exception {
+ final CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6b09a8a..3fd5d0f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -219,6 +219,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ String targetLocation) throws IOException {
+
+ return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bce8028..90de7a6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -186,7 +187,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+ task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
@@ -266,7 +267,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+ task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
task.cancel();
@@ -342,7 +343,8 @@ public class RocksDBAsyncSnapshotTest {
StringSerializer.INSTANCE,
new ValueStateDescriptor<>("foobar", String.class));
- RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(
+ checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index dc90666..c7b5c20 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -172,7 +173,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testRunningSnapshotAfterBackendClosed() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+ CheckpointOptions.forFullCheckpoint());
RocksDB spyDB = keyedStateBackend.db;
@@ -209,7 +211,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testReleasingSnapshotAfterBackendClosed() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+ CheckpointOptions.forFullCheckpoint());
RocksDB spyDB = keyedStateBackend.db;
@@ -237,7 +240,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testDismissingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
snapshot.cancel(true);
verifyRocksObjectsReleased();
}
@@ -245,7 +248,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testDismissingSnapshotNotRunnable() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
snapshot.cancel(true);
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
@@ -262,7 +265,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testCompletingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
@@ -282,7 +285,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testCancelRunningSnapshot() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 36649ad..c1c65b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -296,15 +298,42 @@ public class CheckpointCoordinator {
checkNotNull(targetDirectory, "Savepoint target directory");
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
- CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false);
- if (result.isSuccess()) {
- return result.getPendingCheckpoint().getCompletionFuture();
- }
- else {
- Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
- return FlinkCompletableFuture.completedExceptionally(cause);
+ // Create the unique savepoint directory
+ final String savepointDirectory = SavepointStore
+ .createSavepointDirectory(targetDirectory, job);
+
+ CheckpointTriggerResult triggerResult = triggerCheckpoint(
+ timestamp,
+ props,
+ savepointDirectory,
+ false);
+
+ Future<CompletedCheckpoint> result;
+
+ if (triggerResult.isSuccess()) {
+ result = triggerResult.getPendingCheckpoint().getCompletionFuture();
+ } else {
+ Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
+ result = FlinkCompletableFuture.completedExceptionally(cause);
}
+
+ // Make sure to remove the created base directory on Exceptions
+ result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+ @Override
+ public Void apply(Throwable value) {
+ try {
+ SavepointStore.deleteSavepointDirectory(savepointDirectory);
+ } catch (Throwable t) {
+ LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+ + " after failed savepoint.", t);
+ }
+
+ return null;
+ }
+ }, executor);
+
+ return result;
}
/**
@@ -517,9 +546,16 @@ public class CheckpointCoordinator {
}
// end of lock scope
+ CheckpointOptions checkpointOptions;
+ if (!props.isSavepoint()) {
+ checkpointOptions = CheckpointOptions.forFullCheckpoint();
+ } else {
+ checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
+ }
+
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
- execution.triggerCheckpoint(checkpointID, timestamp);
+ execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
numUnsuccessfulCheckpointsTriggers.set(0);
@@ -756,7 +792,7 @@ public class CheckpointCoordinator {
triggerQueuedRequests();
}
-
+
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
@@ -1030,7 +1066,7 @@ public class CheckpointCoordinator {
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final StateObject stateObject) {
-
+
if (stateObject != null) {
executor.execute(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
new file mode 100644
index 0000000..cb98d10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+/**
+ * Options for performing the checkpoint.
+ *
+ * <p>The {@link CheckpointProperties} are related and cover properties that
+ * are only relevant at the {@link CheckpointCoordinator}. These options are
+ * relevant at the {@link StatefulTask} instances running on task managers.
+ */
+public class CheckpointOptions implements Serializable {
+
+ private static final long serialVersionUID = 5010126558083292915L;
+
+ /** Type of the checkpoint. */
+ @Nonnull
+ private final CheckpointType checkpointType;
+
+ /** Target location for the checkpoint. */
+ @Nullable
+ private final String targetLocation;
+
+ private CheckpointOptions(
+ @Nonnull CheckpointType checkpointType,
+ String targetLocation) {
+ this.checkpointType = checkNotNull(checkpointType);
+ this.targetLocation = targetLocation;
+ }
+
+ /**
+ * Returns the type of checkpoint to perform.
+ *
+ * @return Type of checkpoint to perform.
+ */
+ @Nonnull
+ public CheckpointType getCheckpointType() {
+ return checkpointType;
+ }
+
+ /**
+ * Returns a custom target location or <code>null</code> if none
+ * was specified.
+ *
+ * @return A custom target location or <code>null</code>.
+ */
+ @Nullable
+ public String getTargetLocation() {
+ return targetLocation;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointOptions(" + checkpointType + ")";
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final CheckpointOptions FULL_CHECKPOINT = new CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null);
+
+ public static CheckpointOptions forFullCheckpoint() {
+ return FULL_CHECKPOINT;
+ }
+
+ public static CheckpointOptions forSavepoint(String targetDirectory) {
+ checkNotNull(targetDirectory, "targetDirectory");
+ return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The type of checkpoint to perform.
+ */
+ public enum CheckpointType {
+
+ /** A full checkpoint. */
+ FULL_CHECKPOINT,
+
+ /** A savepoint. */
+ SAVEPOINT;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 52f2a6a..53d888e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -159,7 +159,7 @@ public class CompletedCheckpoint implements Serializable {
void discard() throws Exception {
try {
if (externalPath != null) {
- SavepointStore.removeSavepoint(externalPath);
+ SavepointStore.removeSavepointFile(externalPath);
}
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9f66314..908ff7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -214,7 +214,8 @@ public class PendingCheckpoint {
Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
externalPath = SavepointStore.storeSavepoint(
targetDirectory,
- savepoint);
+ savepoint
+ );
} catch (IOException e) {
LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 48cca20..0caf5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,8 +18,16 @@
package org.apache.flink.runtime.checkpoint.savepoint;
-import org.apache.flink.core.fs.FSDataInputStream;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -28,14 +36,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
- * A file system based savepoint store.
+ * Utilities for storing and loading savepoint meta data files.
*
* <p>Stored savepoints have the following format:
* <pre>
@@ -52,50 +54,84 @@ public class SavepointStore {
/** Magic number for sanity checks against stored savepoints. */
public static final int MAGIC_NUMBER = 0x4960672d;
- /** Prefix for savepoint files. */
- private static final String prefix = "savepoint-";
+ private static final String META_DATA_FILE = "_metadata ";
/**
- * Stores the savepoint.
+ * Creates a savepoint directory.
*
- * @param targetDirectory Target directory to store savepoint in
- * @param savepoint Savepoint to be stored
- * @param <T> Savepoint type
- * @return Path of stored savepoint
- * @throws Exception Failures during store are forwarded
+ * @param baseDirectory Base target directory for the savepoint
+ * @param jobId Optional JobID the savepoint belongs to
+ * @return The created savepoint directory
+ * @throws IOException FileSystem operation failures are forwarded
*/
- public static <T extends Savepoint> String storeSavepoint(
- String targetDirectory,
- T savepoint) throws IOException {
-
- checkNotNull(targetDirectory, "Target directory");
- checkNotNull(savepoint, "Savepoint");
+ public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
+ String prefix;
+ if (jobId == null) {
+ prefix = "savepoint-";
+ } else {
+ prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
+ }
Exception latestException = null;
- Path path = null;
- FSDataOutputStream fdos = null;
+ Path savepointDirectory = null;
FileSystem fs = null;
// Try to create a FS output stream
for (int attempt = 0; attempt < 10; attempt++) {
- path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+ Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
if (fs == null) {
fs = FileSystem.get(path.toUri());
}
try {
- fdos = fs.create(path, false);
- break;
+ if (fs.mkdirs(path)) {
+ savepointDirectory = path;
+ break;
+ }
} catch (Exception e) {
latestException = e;
}
}
- if (fdos == null) {
- throw new IOException("Failed to create file output stream at " + path, latestException);
+ if (savepointDirectory == null) {
+ throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
+ } else {
+ return savepointDirectory.getPath();
}
+ }
+
+ /**
+ * Deletes a savepoint directory.
+ *
+ * @param savepointDirectory Recursively deletes the given directory
+ * @throws IOException FileSystem operation failures are forwarded
+ */
+ public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
+ Path path = new Path(savepointDirectory);
+ FileSystem fs = FileSystem.get(path.toUri());
+ fs.delete(path, true);
+ }
+
+ /**
+ * Stores the savepoint metadata file.
+ *
+ * @param <T> Savepoint type
+ * @param directory Target directory to store savepoint in
+ * @param savepoint Savepoint to be stored
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
+ checkNotNull(directory, "Target directory");
+ checkNotNull(savepoint, "Savepoint");
+
+ Path basePath = new Path(directory);
+ FileSystem fs = FileSystem.get(basePath.toUri());
+
+ Path path = new Path(basePath, META_DATA_FILE);
+ FSDataOutputStream fdos = fs.create(path, false);
boolean success = false;
try (DataOutputStream dos = new DataOutputStream(fdos)) {
@@ -115,20 +151,41 @@ public class SavepointStore {
}
}
- return path.toString();
+ return basePath.toString();
}
/**
* Loads the savepoint at the specified path.
*
- * @param path Path of savepoint to load
+ * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
* @return The loaded savepoint
* @throws Exception Failures during load are forwared
*/
- public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
- Preconditions.checkNotNull(path, "Path");
+ public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
+ Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+
+ Path path = new Path(savepointFileOrDirectory);
+
+ LOG.info("Loading savepoint from {}", path);
- try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+ FileSystem fs = FileSystem.get(path.toUri());
+
+ FileStatus status = fs.getFileStatus(path);
+
+ // If this is a directory, we need to find the meta data file
+ if (status.isDir()) {
+ Path candidatePath = new Path(path, META_DATA_FILE);
+ if (fs.exists(candidatePath)) {
+ path = candidatePath;
+ LOG.info("Using savepoint file in {}", path);
+ } else {
+ throw new IOException("Cannot find meta data file in directory " + path
+ + ". Please try to load the savepoint directly from the meta data file "
+ + "instead of the directory.");
+ }
+ }
+
+ try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
int magicNumber = dis.readInt();
if (magicNumber == MAGIC_NUMBER) {
@@ -152,7 +209,7 @@ public class SavepointStore {
* @param path Path of savepoint to remove
* @throws Exception Failures during disposal are forwarded
*/
- public static void removeSavepoint(String path) throws IOException {
+ public static void removeSavepointFile(String path) throws IOException {
Preconditions.checkNotNull(path, "Path");
try {
@@ -173,14 +230,4 @@ public class SavepointStore {
}
}
- private static FSDataInputStream createFsInputStream(Path path) throws IOException {
- FileSystem fs = FileSystem.get(path.toUri());
-
- if (fs.exists(path)) {
- return fs.open(path);
- } else {
- throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b3fe443..3191d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
@@ -675,14 +676,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
+ * @param checkpointOptions of the checkpoint to trigger
*/
- public void triggerCheckpoint(long checkpointId, long timestamp) {
+ public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final SimpleSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+ taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 59f56b0..0752897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,10 +18,15 @@
package org.apache.flink.runtime.io.network.api;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.RuntimeEvent;
/**
@@ -43,12 +48,14 @@ public class CheckpointBarrier extends RuntimeEvent {
private long id;
private long timestamp;
+ private CheckpointOptions checkpointOptions;
public CheckpointBarrier() {}
- public CheckpointBarrier(long id, long timestamp) {
+ public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
this.timestamp = timestamp;
+ this.checkpointOptions = checkNotNull(checkpointOptions);
}
public long getId() {
@@ -59,20 +66,53 @@ public class CheckpointBarrier extends RuntimeEvent {
return timestamp;
}
+ public CheckpointOptions getCheckpointOptions() {
+ return checkpointOptions;
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialization
// ------------------------------------------------------------------------
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id);
out.writeLong(timestamp);
+ CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+ out.writeInt(checkpointType.ordinal());
+
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ return;
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ String targetLocation = checkpointOptions.getTargetLocation();
+ assert(targetLocation != null);
+ out.writeUTF(targetLocation);
+ } else {
+ throw new IOException("Unknown CheckpointType " + checkpointType);
+ }
}
@Override
public void read(DataInputView in) throws IOException {
id = in.readLong();
timestamp = in.readLong();
+
+ int typeOrdinal = in.readInt();
+ checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+ CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
+
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ checkpointOptions = CheckpointOptions.forFullCheckpoint();
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ String targetLocation = in.readUTF();
+ checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+ } else {
+ throw new IOException("Illegal CheckpointType " + checkpointType);
+ }
}
-
+
+
// ------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 4d9f431..223cbfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -18,8 +18,11 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import java.nio.charset.Charset;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +37,7 @@ import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.flink.util.Preconditions;
/**
* Utility class to serialize and deserialize task events.
@@ -60,10 +64,34 @@ public class EventSerializer {
else if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier barrier = (CheckpointBarrier) event;
- ByteBuffer buf = ByteBuffer.allocate(20);
- buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
- buf.putLong(4, barrier.getId());
- buf.putLong(12, barrier.getTimestamp());
+ CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
+ CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+ ByteBuffer buf;
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ buf = ByteBuffer.allocate(24);
+ buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+ buf.putLong(4, barrier.getId());
+ buf.putLong(12, barrier.getTimestamp());
+ buf.putInt(20, checkpointType.ordinal());
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ String targetLocation = checkpointOptions.getTargetLocation();
+ assert(targetLocation != null);
+ byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+
+ buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+ buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+ buf.putLong(4, barrier.getId());
+ buf.putLong(12, barrier.getTimestamp());
+ buf.putInt(20, checkpointType.ordinal());
+ buf.putInt(24, bytes.length);
+ for (int i = 0; i < bytes.length; i++) {
+ buf.put(28 + i, bytes[i]);
+ }
+ } else {
+ throw new IOException("Unknown checkpoint type: " + checkpointType);
+ }
+
return buf;
}
else if (eventClass == EndOfSuperstepEvent.class) {
@@ -172,7 +200,28 @@ public class EventSerializer {
else if (type == CHECKPOINT_BARRIER_EVENT) {
long id = buffer.getLong();
long timestamp = buffer.getLong();
- return new CheckpointBarrier(id, timestamp);
+
+ CheckpointOptions checkpointOptions;
+
+ int checkpointTypeOrdinal = buffer.getInt();
+ Preconditions.checkElementIndex(type, CheckpointType.values().length,
+ "Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+ CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
+
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ checkpointOptions = CheckpointOptions.forFullCheckpoint();
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ int len = buffer.getInt();
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+
+ checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+ } else {
+ throw new IOException("Unknown checkpoint type: " + checkpointType);
+ }
+
+ return new CheckpointBarrier(id, timestamp, checkpointOptions);
}
else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 87b66ce..0930011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.TaskStateHandles;
/**
@@ -46,21 +47,23 @@ public interface StatefulTask {
* method.
*
* @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointOptions Options for performing this checkpoint
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
- boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
+ boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception;
/**
* This method is called when a checkpoint is triggered as a result of receiving checkpoint
* barriers on all input streams.
*
* @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointOptions Options for performing this checkpoint
* @param checkpointMetrics Metrics about this checkpoint
*
* @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
*/
- void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception;
+ void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception;
/**
* Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index fe4ecfb..2876ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.concurrent.Future;
@@ -196,12 +197,13 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
- long timestamp) {
+ long timestamp,
+ CheckpointOptions checkpointOptions) {
Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(jobId);
- actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp));
+ actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index db0a3bf..09f104f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -160,12 +161,14 @@ public interface TaskManagerGateway {
* @param jobId identifying the job to which the task belongs
* @param checkpointId of the checkpoint to trigger
* @param timestamp of the checkpoint to trigger
+ * @param checkpointOptions of the checkpoint to trigger
*/
void triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
- long timestamp);
+ long timestamp,
+ CheckpointOptions checkpointOptions);
/**
* Request the task manager log from the task manager.
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index eba97d2..28fef27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -123,7 +124,7 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
}
@Override
- public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+ public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
// taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
index 0528755..3477e13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -18,7 +18,10 @@
package org.apache.flink.runtime.messages.checkpoint;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
/**
@@ -33,9 +36,19 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
/** The timestamp associated with the checkpoint */
private final long timestamp;
- public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+ /** Options for how to perform the checkpoint. */
+ private final CheckpointOptions checkpointOptions;
+
+ public TriggerCheckpoint(
+ JobID job,
+ ExecutionAttemptID taskExecutionId,
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) {
+
super(job, taskExecutionId, checkpointId);
this.timestamp = timestamp;
+ this.checkpointOptions = checkNotNull(checkpointOptions);
}
// --------------------------------------------------------------------------------------------
@@ -44,6 +57,10 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
return timestamp;
}
+ public CheckpointOptions getCheckpointOptions() {
+ return checkpointOptions;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 3ed49f1..14f897f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base implementation of KeyedStateBackend. The state can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)}.
*
* @param <K> Type of the key by which state is keyed.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index bc4594a..a335e45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import javax.annotation.Nullable;
import java.io.IOException;
/**
@@ -31,6 +32,7 @@ import java.io.IOException;
*/
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
+
private static final long serialVersionUID = 4620415814639230247L;
@Override
@@ -39,6 +41,12 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
String operatorIdentifier) throws IOException;
@Override
+ public abstract CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ @Nullable String targetLocation) throws IOException;
+
+ @Override
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index adf0727..8dcf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -154,7 +155,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
@Override
public RunnableFuture<OperatorStateHandle> snapshot(
- long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+ long checkpointId,
+ long timestamp,
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
if (registeredStates.isEmpty()) {
return new DoneFuture<>(null);
@@ -346,4 +350,4 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return partitionOffsets;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..0d92b46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import java.util.Collection;
import java.util.concurrent.RunnableFuture;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
/**
* Interface for operations that can perform snapshots of their state.
@@ -37,12 +38,14 @@ public interface Snapshotable<S extends StateObject> {
* @param checkpointId The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
+ * @param checkpointOptions Options for how to perform this checkpoint.
* @return A runnable future that will yield a {@link StateObject}.
*/
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
- CheckpointStreamFactory streamFactory) throws Exception;
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception;
/**
* Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 846df89..7961b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import javax.annotation.Nullable;
import java.io.IOException;
/**
@@ -95,6 +96,27 @@ public interface StateBackend extends java.io.Serializable {
*/
CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
+ /**
+ * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+ * that should end up in a savepoint.
+ *
+ * <p>This is only called if the triggered checkpoint is a savepoint. Commonly
+ * this will return the same factory as for regular checkpoints, but maybe
+ * slightly adjusted.
+ *
+ * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+ * @param operatorIdentifier An identifier of the operator for which we create streams.
+ * @param targetLocation An optional custom location for the savepoint stream.
+ *
+ * @return The stream factory for savepoints.
+ *
+ * @throws IOException Failures during stream creation are forwarded.
+ */
+ CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ @Nullable String targetLocation) throws IOException;
+
// ------------------------------------------------------------------------
// Structure Backends
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 30b1da6..8455d84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -94,18 +94,15 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
MAX_FILE_STATE_THRESHOLD);
}
this.fileStateThreshold = fileStateSizeThreshold;
+
Path basePath = checkpointDataUri;
+ filesystem = basePath.getFileSystem();
- Path dir = new Path(basePath, jobId.toString());
+ checkpointDirectory = createBasePath(filesystem, basePath, jobId);
if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing file stream factory to URI {}.", dir);
+ LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory);
}
-
- filesystem = basePath.getFileSystem();
- filesystem.mkdirs(dir);
-
- checkpointDirectory = dir;
}
@Override
@@ -115,7 +112,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
checkFileSystemInitialized();
- Path checkpointDir = createCheckpointDirPath(checkpointID);
+ Path checkpointDir = createCheckpointDirPath(checkpointDirectory, checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
}
@@ -130,7 +127,13 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
}
}
- private Path createCheckpointDirPath(long checkpointID) {
+ protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+ Path dir = new Path(checkpointDirectory, jobID.toString());
+ fs.mkdirs(dir);
+ return dir;
+ }
+
+ protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
return new Path(checkpointDirectory, "chk-" + checkpointID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
new file mode 100644
index 0000000..7410d2d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import java.io.IOException;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+
+/**
+ * A {@link CheckpointStreamFactory} that produces streams that write to a
+ * {@link FileSystem}.
+ *
+ * <p>The difference to the parent {@link FsCheckpointStreamFactory} is only
+ * in the created directory layout. All checkpoint files go to the checkpoint
+ * directory.
+ */
+public class FsSavepointStreamFactory extends FsCheckpointStreamFactory {
+
+ public FsSavepointStreamFactory(
+ Path checkpointDataUri,
+ JobID jobId,
+ int fileStateSizeThreshold) throws IOException {
+
+ super(checkpointDataUri, jobId, fileStateSizeThreshold);
+ }
+
+ @Override
+ protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+ // No checkpoint specific directory required as the savepoint directory
+ // is already unique.
+ return checkpointDirectory;
+ }
+
+ @Override
+ protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
+ // No checkpoint specific directory required as the savepoint directory
+ // is already unique.
+ return checkpointDirectory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 281dbb0..b614d98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -173,6 +173,15 @@ public class FsStateBackend extends AbstractStateBackend {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ String targetLocation) throws IOException {
+
+ return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold);
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 04e4fbc..4a5455a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -215,7 +216,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public RunnableFuture<KeyGroupsStateHandle> snapshot(
long checkpointId,
long timestamp,
- CheckpointStreamFactory streamFactory) throws Exception {
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
if (stateTables.isEmpty()) {
return new DoneFuture<>(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 58a86df..2cc1164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -75,6 +75,15 @@ public class MemoryStateBackend extends AbstractStateBackend {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ String targetLocation) throws IOException {
+
+ return new MemCheckpointStreamFactory(maxStateSize);
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID,
String operatorIdentifier,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 2980376..8db1d5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -475,13 +476,13 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ----------------------------------------------------------------------
@RpcMethod
- public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+ public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
- task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
+ task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return Acknowledge.get();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ebd4c0c..36a3255 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
@@ -97,9 +98,10 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param executionAttemptID identifying the task
* @param checkpointID unique id for the checkpoint
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+ * @param checkpointOptions for performing the checkpoint
* @return Future acknowledge if the checkpoint has been successfully triggered
*/
- Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp);
+ Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
/**
* Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index acb423b..c9f17b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -1117,8 +1118,13 @@ public class Task implements Runnable, TaskActions {
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
+ * @param checkpointOptions Options for performing this checkpoint.
*/
- public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) {
+ public void triggerCheckpointBarrier(
+ final long checkpointID,
+ long checkpointTimestamp,
+ final CheckpointOptions checkpointOptions) {
+
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
@@ -1134,7 +1140,7 @@ public class Task implements Runnable, TaskActions {
// activate safety net for checkpointing thread
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
- boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
+ boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8b08181..21749cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -837,7 +837,7 @@ class JobManager(
savepoint.dispose()
// Remove the header file
- SavepointStore.removeSavepoint(savepointPath)
+ SavepointStore.removeSavepointFile(savepointPath)
senderRef ! DisposeSavepointSuccess
} catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a70454b..25d5366 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -501,12 +501,13 @@ class TaskManager(
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
+ val checkpointOptions = message.getCheckpointOptions
log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
- task.triggerCheckpointBarrier(checkpointId, timestamp)
+ task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
} else {
log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
}