You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:29 UTC
[1/7] flink git commit: [hotfix] [docs] Fix JavaDoc errors in
'flink-streaming-java'
Repository: flink
Updated Branches:
refs/heads/master 30c9e2b68 -> 417597fbf
[hotfix] [docs] Fix JavaDoc errors in 'flink-streaming-java'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f9f38bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f9f38bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f9f38bf
Branch: refs/heads/master
Commit: 1f9f38bf6312529af3ac527bf2f80f2ecee4d62b
Parents: 30c9e2b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 12:50:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 17:44:25 2017 +0100
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 4 +--
.../api/datastream/WindowedStream.java | 4 +--
.../environment/StreamExecutionEnvironment.java | 7 +++---
.../api/functions/async/AsyncFunction.java | 26 ++++++++++----------
.../api/operators/StreamSourceContexts.java | 6 ++---
.../api/operators/async/AsyncWaitOperator.java | 11 +++++----
.../operators/windowing/MergingWindowSet.java | 2 +-
7 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index c3c7424..4f4546e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -826,7 +826,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*
- * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction)} instead.
+ * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction)} instead.
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
@@ -851,7 +851,7 @@ public class AllWindowedStream<T, W extends Window> {
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*
- * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead.
+ * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead.
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 6809df0..b28434c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1284,7 +1284,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*
- * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction)} instead.
+ * @deprecated Use {@link #fold(Object, FoldFunction, WindowFunction)} instead.
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) {
@@ -1309,7 +1309,7 @@ public class WindowedStream<T, K, W extends Window> {
* @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*
- * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction, TypeInformation, TypeInformation)} instead.
+ * @deprecated Use {@link #fold(Object, FoldFunction, WindowFunction, TypeInformation, TypeInformation)} instead.
*/
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dab0a06..e299e84 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
@@ -484,7 +485,7 @@ public abstract class StreamExecutionEnvironment {
* The number of times the system will try to re-execute failed tasks.
*
* @deprecated This method will be replaced by {@link #setRestartStrategy}. The
- * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
+ * {@link RestartStrategies#fixedDelayRestart(int, Time)} contains the number of
* execution retries.
*/
@Deprecated
@@ -500,9 +501,7 @@ public abstract class StreamExecutionEnvironment {
*
* @return The number of times the system will try to re-execute failed tasks.
*
- * @deprecated This method will be replaced by {@link #getRestartStrategy}. The
- * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
- * execution retries.
+ * @deprecated This method will be replaced by {@link #getRestartStrategy}.
*/
@Deprecated
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index 4de2db1..419c3ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -26,34 +26,34 @@ import java.io.Serializable;
/**
* A function to trigger Async I/O operation.
- * <p>
- * For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
+ *
+ * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
* the result can be collected by calling {@link AsyncCollector#collect}. For each async
* operation, its context is stored in the operator immediately after invoking
* #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
- * <p>
- * {@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
+ *
+ * <p>{@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
* An error can also be propagate to the async IO operator by
* {@link AsyncCollector#collect(Throwable)}.
*
- * <p>
- * Callback example usage:
+ * <p>Callback example usage:
+ *
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
- * @Override
+ *
* public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception {
* HBaseCallback cb = new HBaseCallback(collector);
* Get get = new Get(Bytes.toBytes(row));
* hbase.asyncGet(get, cb);
* }
* }
- * </pre>
+ * }</pre>
*
- * <p>
- * Future example usage:
+ * <p>Future example usage:
+ *
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
- * @Override
+ *
* public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception {
* Get get = new Get(Bytes.toBytes(row));
* ListenableFuture<Result> future = hbase.asyncGet(get);
@@ -68,14 +68,14 @@ import java.io.Serializable;
* });
* }
* }
- * </pre>
+ * }</pre>
*
* @param <IN> The type of the input elements.
* @param <OUT> The type of the returned elements.
*/
-
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
+
/**
* Trigger async operation for each stream input.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 98281c4..e4d051c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -38,9 +38,9 @@ public class StreamSourceContexts {
* Depending on the {@link TimeCharacteristic}, this method will return the adequate
* {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is:
* <ul>
- * <li>{@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}</li>
- * <li>{@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}</li>
- * <li>{@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}</li>
+ * <li>{@link TimeCharacteristic#IngestionTime} = {@code AutomaticWatermarkContext}</li>
+ * <li>{@link TimeCharacteristic#ProcessingTime} = {@code NonTimestampContext}</li>
+ * <li>{@link TimeCharacteristic#EventTime} = {@code ManualWatermarkContext}</li>
* </ul>
* */
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a70d825..4cf79b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -61,13 +62,13 @@ import java.util.concurrent.TimeoutException;
* Within the async function, the user can complete the async collector arbitrarily. Once the async
* collector has been completed, the result is emitted by the operator's emitter to downstream
* operators.
- * <p>
- * The operator offers different output modes depending on the chosen
- * {@link AsyncDataStream.OutputMode}. In order to give exactly once processing guarantees, the
+ *
+ * <p>The operator offers different output modes depending on the chosen
+ * {@link OutputMode}. In order to give exactly once processing guarantees, the
* operator stores all currently in-flight {@link StreamElement} in it's operator state. Upon
* recovery the recorded set of stream elements is replayed.
- * <p>
- * In case of chaining of this operator, it has to be made sure that the operators in the chain are
+ *
+ * <p>In case of chaining of this operator, it has to be made sure that the operators in the chain are
* opened tail to head. The reason for this is that an opened {@link AsyncWaitOperator} starts
* already emitting recovered {@link StreamElement} to downstream operators.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index b79a3fa..de37cb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -42,7 +42,7 @@ import java.util.Map;
*
* <p>A new window can be added to the set of in-flight windows using
* {@link #addWindow(Window, MergeFunction)}. This might merge other windows and the caller
- * must react accordingly in the {@link MergeFunction#merge(Object, Collection, Object, Collection)
+ * must react accordingly in the {@link MergeFunction#merge(Object, Collection, Object, Collection)}
* and adjust the outside view of windows and state.
*
* <p>Windows can be removed from the set of windows using {@link #retireWindow(Window)}.
[7/7] flink git commit: [hotfix] [checkpoints] Remove
equals()/hashCode() from CompletedCheckpoint as semantic equality is not well
defined.
Posted by se...@apache.org.
[hotfix] [checkpoints] Remove equals()/hashCode() from CompletedCheckpoint as semantic equality is not well defined.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/417597fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/417597fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/417597fb
Branch: refs/heads/master
Commit: 417597fbf71ac9062bed1abf04139d46ec830ec4
Parents: 8ffe75a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 22:19:11 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:50 2017 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/CompletedCheckpoint.java | 24 +-------------------
1 file changed, 1 insertion(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/417597fb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 53d888e..db86484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -23,13 +23,13 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.StateUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Map;
-import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -206,29 +206,7 @@ public class CompletedCheckpoint implements Serializable {
// --------------------------------------------------------------------------------------------
@Override
- public boolean equals(Object obj) {
- if (obj instanceof CompletedCheckpoint) {
- CompletedCheckpoint other = (CompletedCheckpoint) obj;
-
- return job.equals(other.job) && checkpointID == other.checkpointID &&
- timestamp == other.timestamp && duration == other.duration &&
- taskStates.equals(other.taskStates);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return (int) (this.checkpointID ^ this.checkpointID >>> 32) +
- 31 * ((int) (this.timestamp ^ this.timestamp >>> 32) +
- 31 * ((int) (this.duration ^ this.duration >>> 32) +
- 31 * Objects.hash(job, taskStates)));
- }
-
- @Override
public String toString() {
return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
}
-
}
[5/7] flink git commit: [FLINK-5763] [checkpoints] Add
CheckpointOptions
Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Add CheckpointOptions
Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
to barrier injecting tasks) and barriers (flowing inline with the data:
```java
public class CheckpointOptions {
// Type of checkpoint
// => FULL_CHECKPOINT
// => SAVEPOINT
@NonNull
CheckpointType getCheckpointType();
// Custom target location. This is a String, because for future
// backends it can be a logical location like a DB table.
@Nullable
String getTargetLocation();
}
```
This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints).
These options are forwarded via the `StreamTask` to the `StreamOperator`s and
`Snapshotable` backends. The `AbstractStreamOperator` checks the options and
either i) forwards the shared per operator `CheckpointStreamFactory` (as of
For this, the state backends provide the following new method:
```
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```
The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).
We end up with the following directory layout for savepoints:
```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`)
+---------------------------+
| +---------------------------------------+
+-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
+---------------------------------------+
|
+- _metadata (one per savepoint)
+- :uuid (one data file per StreamTask)
+- ...
+- :uuid
```
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e7a9174
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e7a9174
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e7a9174
Branch: refs/heads/master
Commit: 6e7a91741708a2b167a2bbca5dda5b2059df5e18
Parents: 1f9f38b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Feb 16 17:56:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100
----------------------------------------------------------------------
.../connectors/fs/RollingSinkITCase.java | 1 -
.../state/RocksDBKeyedStateBackend.java | 5 +-
.../streaming/state/RocksDBStateBackend.java | 9 ++
.../state/RocksDBAsyncSnapshotTest.java | 8 +-
.../state/RocksDBStateBackendTest.java | 15 +-
.../checkpoint/CheckpointCoordinator.java | 56 ++++++--
.../runtime/checkpoint/CheckpointOptions.java | 108 +++++++++++++++
.../runtime/checkpoint/CompletedCheckpoint.java | 2 +-
.../runtime/checkpoint/PendingCheckpoint.java | 3 +-
.../checkpoint/savepoint/SavepointStore.java | 137 +++++++++++++------
.../flink/runtime/executiongraph/Execution.java | 6 +-
.../io/network/api/CheckpointBarrier.java | 44 +++++-
.../api/serialization/EventSerializer.java | 59 +++++++-
.../runtime/jobgraph/tasks/StatefulTask.java | 7 +-
.../slots/ActorTaskManagerGateway.java | 6 +-
.../jobmanager/slots/TaskManagerGateway.java | 5 +-
.../jobmaster/RpcTaskManagerGateway.java | 3 +-
.../messages/checkpoint/TriggerCheckpoint.java | 19 ++-
.../state/AbstractKeyedStateBackend.java | 3 +-
.../runtime/state/AbstractStateBackend.java | 8 ++
.../state/DefaultOperatorStateBackend.java | 8 +-
.../flink/runtime/state/Snapshotable.java | 5 +-
.../flink/runtime/state/StateBackend.java | 22 +++
.../filesystem/FsCheckpointStreamFactory.java | 21 +--
.../filesystem/FsSavepointStreamFactory.java | 58 ++++++++
.../state/filesystem/FsStateBackend.java | 9 ++
.../state/heap/HeapKeyedStateBackend.java | 4 +-
.../state/memory/MemoryStateBackend.java | 9 ++
.../runtime/taskexecutor/TaskExecutor.java | 5 +-
.../taskexecutor/TaskExecutorGateway.java | 4 +-
.../apache/flink/runtime/taskmanager/Task.java | 10 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
.../flink/runtime/taskmanager/TaskManager.scala | 3 +-
.../checkpoint/CheckpointCoordinatorTest.java | 53 ++++---
.../checkpoint/CheckpointOptionsTest.java | 48 +++++++
.../checkpoint/CheckpointStatsHistoryTest.java | 1 +
.../savepoint/MigrationV0ToV1Test.java | 2 +-
.../savepoint/SavepointLoaderTest.java | 4 +-
.../savepoint/SavepointStoreTest.java | 48 +++++--
.../io/network/api/CheckpointBarrierTest.java | 61 +++++++++
.../api/serialization/EventSerializerTest.java | 45 ++++--
.../io/network/api/writer/RecordWriterTest.java | 5 +-
.../jobmanager/JobManagerHARecoveryTest.java | 5 +-
.../messages/CheckpointMessagesTest.java | 3 +-
.../runtime/state/OperatorStateBackendTest.java | 3 +-
.../runtime/state/StateBackendTestBase.java | 39 +++---
.../FsSavepointStreamFactoryTest.java | 67 +++++++++
.../runtime/taskmanager/TaskAsyncCallTest.java | 9 +-
.../api/operators/AbstractStreamOperator.java | 43 +++++-
.../api/operators/OperatorSnapshotResult.java | 2 +-
.../streaming/api/operators/StreamOperator.java | 12 +-
.../streaming/runtime/io/BarrierBuffer.java | 5 +-
.../streaming/runtime/io/BarrierTracker.java | 9 +-
.../streaming/runtime/tasks/OperatorChain.java | 5 +-
.../streaming/runtime/tasks/StreamTask.java | 65 +++++++--
.../api/checkpoint/ListCheckpointedTest.java | 2 +-
.../operators/AbstractStreamOperatorTest.java | 65 +++++----
.../AbstractUdfStreamOperatorLifecycleTest.java | 12 +-
.../WrappingFunctionSnapshotRestoreTest.java | 2 +-
.../operators/async/AsyncWaitOperatorTest.java | 5 +-
.../io/BarrierBufferAlignmentLimitTest.java | 13 +-
.../io/BarrierBufferMassiveRandomTest.java | 3 +-
.../streaming/runtime/io/BarrierBufferTest.java | 33 ++---
.../runtime/io/BarrierTrackerTest.java | 7 +-
.../runtime/tasks/BlockingCheckpointsTest.java | 8 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 31 +++--
.../runtime/tasks/SourceStreamTaskTest.java | 3 +-
.../StreamTaskCancellationBarrierTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 37 ++---
.../runtime/tasks/TwoInputStreamTaskTest.java | 29 ++--
.../util/AbstractStreamOperatorTestHarness.java | 10 +-
.../KeyedOneInputStreamOperatorTestHarness.java | 7 +-
.../test/checkpointing/SavepointITCase.java | 51 ++++---
.../streaming/runtime/StateBackendITCase.java | 7 +-
74 files changed, 1173 insertions(+), 354 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 80ae294..72f2f21 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -941,7 +941,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
}
}
-
private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
private String key;
private String expect;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a0efe78..bd8d4dd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @param checkpointId The Id of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
+ * @param checkpointOptions Options for how to perform this checkpoint.
* @return Future to the state handle of the snapshot data.
* @throws Exception
*/
@@ -251,7 +253,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public RunnableFuture<KeyGroupsStateHandle> snapshot(
final long checkpointId,
final long timestamp,
- final CheckpointStreamFactory streamFactory) throws Exception {
+ final CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
long startTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6b09a8a..3fd5d0f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -219,6 +219,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ String targetLocation) throws IOException {
+
+ return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bce8028..90de7a6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -186,7 +187,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+ task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
@@ -266,7 +267,7 @@ public class RocksDBAsyncSnapshotTest {
}
}
- task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+ task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
testHarness.processElement(new StreamRecord<>("Wohoo", 0));
BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
task.cancel();
@@ -342,7 +343,8 @@ public class RocksDBAsyncSnapshotTest {
StringSerializer.INSTANCE,
new ValueStateDescriptor<>("foobar", String.class));
- RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(
+ checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index dc90666..c7b5c20 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -172,7 +173,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testRunningSnapshotAfterBackendClosed() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+ CheckpointOptions.forFullCheckpoint());
RocksDB spyDB = keyedStateBackend.db;
@@ -209,7 +211,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testReleasingSnapshotAfterBackendClosed() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+ CheckpointOptions.forFullCheckpoint());
RocksDB spyDB = keyedStateBackend.db;
@@ -237,7 +240,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testDismissingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
snapshot.cancel(true);
verifyRocksObjectsReleased();
}
@@ -245,7 +248,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testDismissingSnapshotNotRunnable() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
snapshot.cancel(true);
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
@@ -262,7 +265,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testCompletingSnapshot() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
@@ -282,7 +285,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
@Test
public void testCancelRunningSnapshot() throws Exception {
setupRocksKeyedStateBackend();
- RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+ RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
Thread asyncSnapshotThread = new Thread(snapshot);
asyncSnapshotThread.start();
waiter.await(); // wait for snapshot to run
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 36649ad..c1c65b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -296,15 +298,42 @@ public class CheckpointCoordinator {
checkNotNull(targetDirectory, "Savepoint target directory");
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
- CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false);
- if (result.isSuccess()) {
- return result.getPendingCheckpoint().getCompletionFuture();
- }
- else {
- Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
- return FlinkCompletableFuture.completedExceptionally(cause);
+ // Create the unique savepoint directory
+ final String savepointDirectory = SavepointStore
+ .createSavepointDirectory(targetDirectory, job);
+
+ CheckpointTriggerResult triggerResult = triggerCheckpoint(
+ timestamp,
+ props,
+ savepointDirectory,
+ false);
+
+ Future<CompletedCheckpoint> result;
+
+ if (triggerResult.isSuccess()) {
+ result = triggerResult.getPendingCheckpoint().getCompletionFuture();
+ } else {
+ Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
+ result = FlinkCompletableFuture.completedExceptionally(cause);
}
+
+ // Make sure to remove the created base directory on Exceptions
+ result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+ @Override
+ public Void apply(Throwable value) {
+ try {
+ SavepointStore.deleteSavepointDirectory(savepointDirectory);
+ } catch (Throwable t) {
+ LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+ + " after failed savepoint.", t);
+ }
+
+ return null;
+ }
+ }, executor);
+
+ return result;
}
/**
@@ -517,9 +546,16 @@ public class CheckpointCoordinator {
}
// end of lock scope
+ CheckpointOptions checkpointOptions;
+ if (!props.isSavepoint()) {
+ checkpointOptions = CheckpointOptions.forFullCheckpoint();
+ } else {
+ checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
+ }
+
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
- execution.triggerCheckpoint(checkpointID, timestamp);
+ execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
numUnsuccessfulCheckpointsTriggers.set(0);
@@ -756,7 +792,7 @@ public class CheckpointCoordinator {
triggerQueuedRequests();
}
-
+
// record the time when this was completed, to calculate
// the 'min delay between checkpoints'
lastCheckpointCompletionNanos = System.nanoTime();
@@ -1030,7 +1066,7 @@ public class CheckpointCoordinator {
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final StateObject stateObject) {
-
+
if (stateObject != null) {
executor.execute(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
new file mode 100644
index 0000000..cb98d10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+/**
+ * Options for performing the checkpoint.
+ *
+ * <p>The {@link CheckpointProperties} are related and cover properties that
+ * are only relevant at the {@link CheckpointCoordinator}. These options are
+ * relevant at the {@link StatefulTask} instances running on task managers.
+ */
+public class CheckpointOptions implements Serializable {
+
+ private static final long serialVersionUID = 5010126558083292915L;
+
+ /** Type of the checkpoint. */
+ @Nonnull
+ private final CheckpointType checkpointType;
+
+ /** Target location for the checkpoint. */
+ @Nullable
+ private final String targetLocation;
+
+ private CheckpointOptions(
+ @Nonnull CheckpointType checkpointType,
+ String targetLocation) {
+ this.checkpointType = checkNotNull(checkpointType);
+ this.targetLocation = targetLocation;
+ }
+
+ /**
+ * Returns the type of checkpoint to perform.
+ *
+ * @return Type of checkpoint to perform.
+ */
+ @Nonnull
+ public CheckpointType getCheckpointType() {
+ return checkpointType;
+ }
+
+ /**
+ * Returns a custom target location or <code>null</code> if none
+ * was specified.
+ *
+ * @return A custom target location or <code>null</code>.
+ */
+ @Nullable
+ public String getTargetLocation() {
+ return targetLocation;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointOptions(" + checkpointType + ")";
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final CheckpointOptions FULL_CHECKPOINT = new CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null);
+
+ public static CheckpointOptions forFullCheckpoint() {
+ return FULL_CHECKPOINT;
+ }
+
+ public static CheckpointOptions forSavepoint(String targetDirectory) {
+ checkNotNull(targetDirectory, "targetDirectory");
+ return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The type of checkpoint to perform.
+ */
+ public enum CheckpointType {
+
+ /** A full checkpoint. */
+ FULL_CHECKPOINT,
+
+ /** A savepoint. */
+ SAVEPOINT;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 52f2a6a..53d888e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -159,7 +159,7 @@ public class CompletedCheckpoint implements Serializable {
void discard() throws Exception {
try {
if (externalPath != null) {
- SavepointStore.removeSavepoint(externalPath);
+ SavepointStore.removeSavepointFile(externalPath);
}
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9f66314..908ff7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -214,7 +214,8 @@ public class PendingCheckpoint {
Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
externalPath = SavepointStore.storeSavepoint(
targetDirectory,
- savepoint);
+ savepoint
+ );
} catch (IOException e) {
LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 48cca20..0caf5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,8 +18,16 @@
package org.apache.flink.runtime.checkpoint.savepoint;
-import org.apache.flink.core.fs.FSDataInputStream;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -28,14 +36,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
- * A file system based savepoint store.
+ * Utilities for storing and loading savepoint meta data files.
*
* <p>Stored savepoints have the following format:
* <pre>
@@ -52,50 +54,84 @@ public class SavepointStore {
/** Magic number for sanity checks against stored savepoints. */
public static final int MAGIC_NUMBER = 0x4960672d;
- /** Prefix for savepoint files. */
- private static final String prefix = "savepoint-";
+ private static final String META_DATA_FILE = "_metadata ";
/**
- * Stores the savepoint.
+ * Creates a savepoint directory.
*
- * @param targetDirectory Target directory to store savepoint in
- * @param savepoint Savepoint to be stored
- * @param <T> Savepoint type
- * @return Path of stored savepoint
- * @throws Exception Failures during store are forwarded
+ * @param baseDirectory Base target directory for the savepoint
+ * @param jobId Optional JobID the savepoint belongs to
+ * @return The created savepoint directory
+ * @throws IOException FileSystem operation failures are forwarded
*/
- public static <T extends Savepoint> String storeSavepoint(
- String targetDirectory,
- T savepoint) throws IOException {
-
- checkNotNull(targetDirectory, "Target directory");
- checkNotNull(savepoint, "Savepoint");
+ public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
+ String prefix;
+ if (jobId == null) {
+ prefix = "savepoint-";
+ } else {
+ prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
+ }
Exception latestException = null;
- Path path = null;
- FSDataOutputStream fdos = null;
+ Path savepointDirectory = null;
FileSystem fs = null;
// Try to create a FS output stream
for (int attempt = 0; attempt < 10; attempt++) {
- path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+ Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
if (fs == null) {
fs = FileSystem.get(path.toUri());
}
try {
- fdos = fs.create(path, false);
- break;
+ if (fs.mkdirs(path)) {
+ savepointDirectory = path;
+ break;
+ }
} catch (Exception e) {
latestException = e;
}
}
- if (fdos == null) {
- throw new IOException("Failed to create file output stream at " + path, latestException);
+ if (savepointDirectory == null) {
+ throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
+ } else {
+ return savepointDirectory.getPath();
}
+ }
+
+ /**
+ * Deletes a savepoint directory.
+ *
+ * @param savepointDirectory Recursively deletes the given directory
+ * @throws IOException FileSystem operation failures are forwarded
+ */
+ public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
+ Path path = new Path(savepointDirectory);
+ FileSystem fs = FileSystem.get(path.toUri());
+ fs.delete(path, true);
+ }
+
+ /**
+ * Stores the savepoint metadata file.
+ *
+ * @param <T> Savepoint type
+ * @param directory Target directory to store savepoint in
+ * @param savepoint Savepoint to be stored
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
+ checkNotNull(directory, "Target directory");
+ checkNotNull(savepoint, "Savepoint");
+
+ Path basePath = new Path(directory);
+ FileSystem fs = FileSystem.get(basePath.toUri());
+
+ Path path = new Path(basePath, META_DATA_FILE);
+ FSDataOutputStream fdos = fs.create(path, false);
boolean success = false;
try (DataOutputStream dos = new DataOutputStream(fdos)) {
@@ -115,20 +151,41 @@ public class SavepointStore {
}
}
- return path.toString();
+ return basePath.toString();
}
/**
* Loads the savepoint at the specified path.
*
- * @param path Path of savepoint to load
+ * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
* @return The loaded savepoint
* @throws Exception Failures during load are forwared
*/
- public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
- Preconditions.checkNotNull(path, "Path");
+ public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
+ Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+
+ Path path = new Path(savepointFileOrDirectory);
+
+ LOG.info("Loading savepoint from {}", path);
- try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+ FileSystem fs = FileSystem.get(path.toUri());
+
+ FileStatus status = fs.getFileStatus(path);
+
+ // If this is a directory, we need to find the meta data file
+ if (status.isDir()) {
+ Path candidatePath = new Path(path, META_DATA_FILE);
+ if (fs.exists(candidatePath)) {
+ path = candidatePath;
+ LOG.info("Using savepoint file in {}", path);
+ } else {
+ throw new IOException("Cannot find meta data file in directory " + path
+ + ". Please try to load the savepoint directly from the meta data file "
+ + "instead of the directory.");
+ }
+ }
+
+ try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
int magicNumber = dis.readInt();
if (magicNumber == MAGIC_NUMBER) {
@@ -152,7 +209,7 @@ public class SavepointStore {
* @param path Path of savepoint to remove
* @throws Exception Failures during disposal are forwarded
*/
- public static void removeSavepoint(String path) throws IOException {
+ public static void removeSavepointFile(String path) throws IOException {
Preconditions.checkNotNull(path, "Path");
try {
@@ -173,14 +230,4 @@ public class SavepointStore {
}
}
- private static FSDataInputStream createFsInputStream(Path path) throws IOException {
- FileSystem fs = FileSystem.get(path.toUri());
-
- if (fs.exists(path)) {
- return fs.open(path);
- } else {
- throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b3fe443..3191d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
@@ -675,14 +676,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
+ * @param checkpointOptions of the checkpoint to trigger
*/
- public void triggerCheckpoint(long checkpointId, long timestamp) {
+ public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final SimpleSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+ taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 59f56b0..0752897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,10 +18,15 @@
package org.apache.flink.runtime.io.network.api;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.RuntimeEvent;
/**
@@ -43,12 +48,14 @@ public class CheckpointBarrier extends RuntimeEvent {
private long id;
private long timestamp;
+ private CheckpointOptions checkpointOptions;
public CheckpointBarrier() {}
- public CheckpointBarrier(long id, long timestamp) {
+ public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
this.timestamp = timestamp;
+ this.checkpointOptions = checkNotNull(checkpointOptions);
}
public long getId() {
@@ -59,20 +66,53 @@ public class CheckpointBarrier extends RuntimeEvent {
return timestamp;
}
+ public CheckpointOptions getCheckpointOptions() {
+ return checkpointOptions;
+ }
+
+ // ------------------------------------------------------------------------
+ // Serialization
// ------------------------------------------------------------------------
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id);
out.writeLong(timestamp);
+ CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+ out.writeInt(checkpointType.ordinal());
+
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ return;
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ String targetLocation = checkpointOptions.getTargetLocation();
+ assert(targetLocation != null);
+ out.writeUTF(targetLocation);
+ } else {
+ throw new IOException("Unknown CheckpointType " + checkpointType);
+ }
}
@Override
public void read(DataInputView in) throws IOException {
id = in.readLong();
timestamp = in.readLong();
+
+ int typeOrdinal = in.readInt();
+ checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+ CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
+
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ checkpointOptions = CheckpointOptions.forFullCheckpoint();
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ String targetLocation = in.readUTF();
+ checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+ } else {
+ throw new IOException("Illegal CheckpointType " + checkpointType);
+ }
}
-
+
+
// ------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 4d9f431..223cbfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -18,8 +18,11 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import java.nio.charset.Charset;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +37,7 @@ import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.flink.util.Preconditions;
/**
* Utility class to serialize and deserialize task events.
@@ -60,10 +64,34 @@ public class EventSerializer {
else if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier barrier = (CheckpointBarrier) event;
- ByteBuffer buf = ByteBuffer.allocate(20);
- buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
- buf.putLong(4, barrier.getId());
- buf.putLong(12, barrier.getTimestamp());
+ CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
+ CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+ ByteBuffer buf;
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ buf = ByteBuffer.allocate(24);
+ buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+ buf.putLong(4, barrier.getId());
+ buf.putLong(12, barrier.getTimestamp());
+ buf.putInt(20, checkpointType.ordinal());
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ String targetLocation = checkpointOptions.getTargetLocation();
+ assert(targetLocation != null);
+ byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+
+ buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+ buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+ buf.putLong(4, barrier.getId());
+ buf.putLong(12, barrier.getTimestamp());
+ buf.putInt(20, checkpointType.ordinal());
+ buf.putInt(24, bytes.length);
+ for (int i = 0; i < bytes.length; i++) {
+ buf.put(28 + i, bytes[i]);
+ }
+ } else {
+ throw new IOException("Unknown checkpoint type: " + checkpointType);
+ }
+
return buf;
}
else if (eventClass == EndOfSuperstepEvent.class) {
@@ -172,7 +200,28 @@ public class EventSerializer {
else if (type == CHECKPOINT_BARRIER_EVENT) {
long id = buffer.getLong();
long timestamp = buffer.getLong();
- return new CheckpointBarrier(id, timestamp);
+
+ CheckpointOptions checkpointOptions;
+
+ int checkpointTypeOrdinal = buffer.getInt();
+ Preconditions.checkElementIndex(type, CheckpointType.values().length,
+ "Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+ CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
+
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ checkpointOptions = CheckpointOptions.forFullCheckpoint();
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ int len = buffer.getInt();
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+
+ checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+ } else {
+ throw new IOException("Unknown checkpoint type: " + checkpointType);
+ }
+
+ return new CheckpointBarrier(id, timestamp, checkpointOptions);
}
else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 87b66ce..0930011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.TaskStateHandles;
/**
@@ -46,21 +47,23 @@ public interface StatefulTask {
* method.
*
* @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointOptions Options for performing this checkpoint
*
* @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
*/
- boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
+ boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception;
/**
* This method is called when a checkpoint is triggered as a result of receiving checkpoint
* barriers on all input streams.
*
* @param checkpointMetaData Meta data for about this checkpoint
+ * @param checkpointOptions Options for performing this checkpoint
* @param checkpointMetrics Metrics about this checkpoint
*
* @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
*/
- void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception;
+ void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception;
/**
* Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index fe4ecfb..2876ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.concurrent.Future;
@@ -196,12 +197,13 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
- long timestamp) {
+ long timestamp,
+ CheckpointOptions checkpointOptions) {
Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(jobId);
- actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp));
+ actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index db0a3bf..09f104f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -160,12 +161,14 @@ public interface TaskManagerGateway {
* @param jobId identifying the job to which the task belongs
* @param checkpointId of the checkpoint to trigger
* @param timestamp of the checkpoint to trigger
+ * @param checkpointOptions of the checkpoint to trigger
*/
void triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
- long timestamp);
+ long timestamp,
+ CheckpointOptions checkpointOptions);
/**
* Request the task manager log from the task manager.
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index eba97d2..28fef27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -123,7 +124,7 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
}
@Override
- public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+ public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
// taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
index 0528755..3477e13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -18,7 +18,10 @@
package org.apache.flink.runtime.messages.checkpoint;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
/**
@@ -33,9 +36,19 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
/** The timestamp associated with the checkpoint */
private final long timestamp;
- public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+ /** Options for how to perform the checkpoint. */
+ private final CheckpointOptions checkpointOptions;
+
+ public TriggerCheckpoint(
+ JobID job,
+ ExecutionAttemptID taskExecutionId,
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) {
+
super(job, taskExecutionId, checkpointId);
this.timestamp = timestamp;
+ this.checkpointOptions = checkNotNull(checkpointOptions);
}
// --------------------------------------------------------------------------------------------
@@ -44,6 +57,10 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
return timestamp;
}
+ public CheckpointOptions getCheckpointOptions() {
+ return checkpointOptions;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 3ed49f1..14f897f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base implementation of KeyedStateBackend. The state can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)}.
*
* @param <K> Type of the key by which state is keyed.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index bc4594a..a335e45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import javax.annotation.Nullable;
import java.io.IOException;
/**
@@ -31,6 +32,7 @@ import java.io.IOException;
*/
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
+
private static final long serialVersionUID = 4620415814639230247L;
@Override
@@ -39,6 +41,12 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
String operatorIdentifier) throws IOException;
@Override
+ public abstract CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ @Nullable String targetLocation) throws IOException;
+
+ @Override
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index adf0727..8dcf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -154,7 +155,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
@Override
public RunnableFuture<OperatorStateHandle> snapshot(
- long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+ long checkpointId,
+ long timestamp,
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
if (registeredStates.isEmpty()) {
return new DoneFuture<>(null);
@@ -346,4 +350,4 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return partitionOffsets;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..0d92b46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import java.util.Collection;
import java.util.concurrent.RunnableFuture;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
/**
* Interface for operations that can perform snapshots of their state.
@@ -37,12 +38,14 @@ public interface Snapshotable<S extends StateObject> {
* @param checkpointId The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
+ * @param checkpointOptions Options for how to perform this checkpoint.
* @return A runnable future that will yield a {@link StateObject}.
*/
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
- CheckpointStreamFactory streamFactory) throws Exception;
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception;
/**
* Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 846df89..7961b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import javax.annotation.Nullable;
import java.io.IOException;
/**
@@ -95,6 +96,27 @@ public interface StateBackend extends java.io.Serializable {
*/
CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
+ /**
+ * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+ * that should end up in a savepoint.
+ *
+ * <p>This is only called if the triggered checkpoint is a savepoint. Commonly
+ * this will return the same factory as for regular checkpoints, but maybe
+ * slightly adjusted.
+ *
+ * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+ * @param operatorIdentifier An identifier of the operator for which we create streams.
+ * @param targetLocation An optional custom location for the savepoint stream.
+ *
+ * @return The stream factory for savepoints.
+ *
+ * @throws IOException Failures during stream creation are forwarded.
+ */
+ CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ @Nullable String targetLocation) throws IOException;
+
// ------------------------------------------------------------------------
// Structure Backends
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 30b1da6..8455d84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -94,18 +94,15 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
MAX_FILE_STATE_THRESHOLD);
}
this.fileStateThreshold = fileStateSizeThreshold;
+
Path basePath = checkpointDataUri;
+ filesystem = basePath.getFileSystem();
- Path dir = new Path(basePath, jobId.toString());
+ checkpointDirectory = createBasePath(filesystem, basePath, jobId);
if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing file stream factory to URI {}.", dir);
+ LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory);
}
-
- filesystem = basePath.getFileSystem();
- filesystem.mkdirs(dir);
-
- checkpointDirectory = dir;
}
@Override
@@ -115,7 +112,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
checkFileSystemInitialized();
- Path checkpointDir = createCheckpointDirPath(checkpointID);
+ Path checkpointDir = createCheckpointDirPath(checkpointDirectory, checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
}
@@ -130,7 +127,13 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
}
}
- private Path createCheckpointDirPath(long checkpointID) {
+ protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+ Path dir = new Path(checkpointDirectory, jobID.toString());
+ fs.mkdirs(dir);
+ return dir;
+ }
+
+ protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
return new Path(checkpointDirectory, "chk-" + checkpointID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
new file mode 100644
index 0000000..7410d2d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import java.io.IOException;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+
+/**
+ * A {@link CheckpointStreamFactory} that produces streams that write to a
+ * {@link FileSystem}.
+ *
+ * <p>The difference to the parent {@link FsCheckpointStreamFactory} is only
+ * in the created directory layout. All checkpoint files go to the checkpoint
+ * directory.
+ */
+public class FsSavepointStreamFactory extends FsCheckpointStreamFactory {
+
+ public FsSavepointStreamFactory(
+ Path checkpointDataUri,
+ JobID jobId,
+ int fileStateSizeThreshold) throws IOException {
+
+ super(checkpointDataUri, jobId, fileStateSizeThreshold);
+ }
+
+ @Override
+ protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+ // No checkpoint specific directory required as the savepoint directory
+ // is already unique.
+ return checkpointDirectory;
+ }
+
+ @Override
+ protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
+ // No checkpoint specific directory required as the savepoint directory
+ // is already unique.
+ return checkpointDirectory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 281dbb0..b614d98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -173,6 +173,15 @@ public class FsStateBackend extends AbstractStateBackend {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ String targetLocation) throws IOException {
+
+ return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold);
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 04e4fbc..4a5455a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -215,7 +216,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
public RunnableFuture<KeyGroupsStateHandle> snapshot(
long checkpointId,
long timestamp,
- CheckpointStreamFactory streamFactory) throws Exception {
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) throws Exception {
if (stateTables.isEmpty()) {
return new DoneFuture<>(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 58a86df..2cc1164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -75,6 +75,15 @@ public class MemoryStateBackend extends AbstractStateBackend {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(
+ JobID jobId,
+ String operatorIdentifier,
+ String targetLocation) throws IOException {
+
+ return new MemCheckpointStreamFactory(maxStateSize);
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID,
String operatorIdentifier,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 2980376..8db1d5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -475,13 +476,13 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ----------------------------------------------------------------------
@RpcMethod
- public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+ public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
- task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
+ task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return Acknowledge.get();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ebd4c0c..36a3255 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
@@ -97,9 +98,10 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param executionAttemptID identifying the task
* @param checkpointID unique id for the checkpoint
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+ * @param checkpointOptions for performing the checkpoint
* @return Future acknowledge if the checkpoint has been successfully triggered
*/
- Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp);
+ Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
/**
* Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index acb423b..c9f17b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -1117,8 +1118,13 @@ public class Task implements Runnable, TaskActions {
*
* @param checkpointID The ID identifying the checkpoint.
* @param checkpointTimestamp The timestamp associated with the checkpoint.
+ * @param checkpointOptions Options for performing this checkpoint.
*/
- public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) {
+ public void triggerCheckpointBarrier(
+ final long checkpointID,
+ long checkpointTimestamp,
+ final CheckpointOptions checkpointOptions) {
+
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
@@ -1134,7 +1140,7 @@ public class Task implements Runnable, TaskActions {
// activate safety net for checkpointing thread
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
- boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
+ boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8b08181..21749cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -837,7 +837,7 @@ class JobManager(
savepoint.dispose()
// Remove the header file
- SavepointStore.removeSavepoint(savepointPath)
+ SavepointStore.removeSavepointFile(savepointPath)
senderRef ! DisposeSavepointSuccess
} catch {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a70454b..25d5366 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -501,12 +501,13 @@ class TaskManager(
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
+ val checkpointOptions = message.getCheckpointOptions
log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
- task.triggerCheckpointBarrier(checkpointId, timestamp)
+ task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
} else {
log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
}
[6/7] flink git commit: [FLINK-5887] [checkpointing] Make
CheckpointBarrier type immutable.
Posted by se...@apache.org.
[FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ffe75a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ffe75a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ffe75a5
Branch: refs/heads/master
Commit: 8ffe75a54f24cbd8e69c455b42a4e969b943a279
Parents: df16e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 15:04:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100
----------------------------------------------------------------------
.../runtime/checkpoint/CheckpointOptions.java | 2 +-
.../io/network/api/CheckpointBarrier.java | 66 +++++++-------------
.../io/network/api/CheckpointBarrierTest.java | 40 ++++++------
.../api/serialization/EventSerializerTest.java | 13 ++--
4 files changed, 46 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index cb98d10..676cf3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -46,7 +46,7 @@ public class CheckpointOptions implements Serializable {
private CheckpointOptions(
@Nonnull CheckpointType checkpointType,
- String targetLocation) {
+ @Nullable String targetLocation) {
this.checkpointType = checkNotNull(checkpointType);
this.targetLocation = targetLocation;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index a42c25d..97ad90f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,18 +18,14 @@
package org.apache.flink.runtime.io.network.api;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.RuntimeEvent;
-import org.apache.flink.util.StringUtils;
/**
* Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -48,11 +44,9 @@ import org.apache.flink.util.StringUtils;
*/
public class CheckpointBarrier extends RuntimeEvent {
- private long id;
- private long timestamp;
- private CheckpointOptions checkpointOptions;
-
- public CheckpointBarrier() {}
+ private final long id;
+ private final long timestamp;
+ private final CheckpointOptions checkpointOptions;
public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
@@ -75,66 +69,48 @@ public class CheckpointBarrier extends RuntimeEvent {
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
-
+
+ //
+ // These methods are inherited form the generic serialization of AbstractEvent
+ // but would require the CheckpointBarrier to be mutable. Since all serialization
+ // for events goes through the EventSerializer class, which has special serialization
+ // for the CheckpointBarrier, we don't need these methods
+ //
+
@Override
public void write(DataOutputView out) throws IOException {
- out.writeLong(id);
- out.writeLong(timestamp);
- CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-
- out.writeInt(checkpointType.ordinal());
-
- if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
- return;
- } else if (checkpointType == CheckpointType.SAVEPOINT) {
- String targetLocation = checkpointOptions.getTargetLocation();
- checkState(targetLocation != null);
- StringUtils.writeString(targetLocation, out);
- } else {
- throw new IOException("Unknown CheckpointType " + checkpointType);
- }
+ throw new UnsupportedOperationException("This method should never be called");
}
@Override
public void read(DataInputView in) throws IOException {
- id = in.readLong();
- timestamp = in.readLong();
-
- int typeOrdinal = in.readInt();
- checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
- CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
-
- if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
- checkpointOptions = CheckpointOptions.forFullCheckpoint();
- } else if (checkpointType == CheckpointType.SAVEPOINT) {
- String targetLocation = StringUtils.readString(in);
- checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
- } else {
- throw new IOException("Illegal CheckpointType " + checkpointType);
- }
+ throw new UnsupportedOperationException("This method should never be called");
}
-
// ------------------------------------------------------------------------
@Override
public int hashCode() {
- return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+ return (int) (id ^ (id >>> 32) ^ timestamp ^ (timestamp >>> 32));
}
@Override
public boolean equals(Object other) {
- if (other == null || !(other instanceof CheckpointBarrier)) {
+ if (other == this) {
+ return true;
+ }
+ else if (other == null || other.getClass() != CheckpointBarrier.class) {
return false;
}
else {
CheckpointBarrier that = (CheckpointBarrier) other;
- return that.id == this.id && that.timestamp == this.timestamp;
+ return that.id == this.id && that.timestamp == this.timestamp &&
+ this.checkpointOptions.equals(that.checkpointOptions);
}
}
@Override
public String toString() {
- return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+ return String.format("CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointOptions);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index ad9fc16..ba833c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -18,44 +18,40 @@
package org.apache.flink.runtime.io.network.api;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
+
import org.junit.Test;
+import static org.junit.Assert.fail;
+
public class CheckpointBarrierTest {
/**
* Test serialization of the checkpoint barrier.
+ * The checkpoint barrier does not support its own serialization, in order to be immutable.
*/
@Test
public void testSerialization() throws Exception {
long id = Integer.MAX_VALUE + 123123L;
long timestamp = Integer.MAX_VALUE + 1228L;
- CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
- testSerialization(id, timestamp, checkpoint);
-
- CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
- testSerialization(id, timestamp, savepoint);
- }
-
- private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+ CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
- DataOutputSerializer out = new DataOutputSerializer(1024);
- barrier.write(out);
-
- DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
- CheckpointBarrier deserialized = new CheckpointBarrier();
- deserialized.read(in);
-
- assertEquals(id, deserialized.getId());
- assertEquals(timestamp, deserialized.getTimestamp());
- assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
- assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+ try {
+ barrier.write(new DataOutputSerializer(1024));
+ fail("should throw an exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ barrier.read(new DataInputDeserializer(new byte[32]));
+ fail("should throw an exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index e674eb7..f51b083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -147,15 +147,14 @@ public class EventSerializerTest {
* thinks the encoded buffer matches the class
* @throws IOException
*/
- private final boolean checkIsEvent(final AbstractEvent event,
- final Class<? extends AbstractEvent> eventClass) throws
- IOException {
- final Buffer serializedEvent =
- EventSerializer.toBuffer(event);
+ private boolean checkIsEvent(
+ AbstractEvent event,
+ Class<? extends AbstractEvent> eventClass) throws IOException {
+
+ final Buffer serializedEvent = EventSerializer.toBuffer(event);
try {
final ClassLoader cl = getClass().getClassLoader();
- return EventSerializer
- .isEvent(serializedEvent, eventClass, cl);
+ return EventSerializer.isEvent(serializedEvent, eventClass, cl);
} finally {
serializedEvent.recycle();
}
[3/7] flink git commit: [FLINK-5763] [checkpoints] Add
CheckpointOptions
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 46f228a..e407443 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -155,7 +156,9 @@ public class BarrierBufferAlignmentLimitTest {
check(sequence[21], buffer.getNextNonBlocked());
// no call for a completed checkpoint must have happened
- verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+ any(CheckpointMetaData.class),
+ any(CheckpointOptions.class),
any(CheckpointMetrics.class));
assertNull(buffer.getNextNonBlocked());
@@ -242,7 +245,8 @@ public class BarrierBufferAlignmentLimitTest {
// checkpoint 4 completed - check and validate buffered replay
check(sequence[9], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(
+ argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
check(sequence[10], buffer.getNextNonBlocked());
check(sequence[15], buffer.getNextNonBlocked());
@@ -254,7 +258,8 @@ public class BarrierBufferAlignmentLimitTest {
check(sequence[21], buffer.getNextNonBlocked());
// only checkpoint 4 was successfully completed, not checkpoint 3
- verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+ argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -284,7 +289,7 @@ public class BarrierBufferAlignmentLimitTest {
}
private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
}
private static void check(BufferOrEvent expected, BufferOrEvent present) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 0cf866a..6e088f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -151,7 +152,7 @@ public class BarrierBufferMassiveRandomTest {
if (barrierGens[currentChannel].isNextBarrier()) {
return new BufferOrEvent(
- new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+ new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()),
currentChannel);
} else {
Buffer buffer = bufferPools[currentChannel].requestBuffer();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 869d1fe..d6056d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -566,7 +567,7 @@ public class BarrierBufferTest {
// checkpoint done - replay buffered
check(sequence[5], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
check(sequence[6], buffer.getNextNonBlocked());
check(sequence[9], buffer.getNextNonBlocked());
@@ -1008,14 +1009,14 @@ public class BarrierBufferTest {
check(sequence[0], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[6], buffer.getNextNonBlocked());
assertEquals(5L, buffer.getCurrentCheckpointId());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[8], buffer.getNextNonBlocked());
@@ -1078,7 +1079,7 @@ public class BarrierBufferTest {
check(sequence[2], buffer.getNextNonBlocked());
startTs = System.nanoTime();
check(sequence[5], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[6], buffer.getNextNonBlocked());
@@ -1097,7 +1098,7 @@ public class BarrierBufferTest {
check(sequence[16], buffer.getNextNonBlocked());
startTs = System.nanoTime();
check(sequence[20], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[21], buffer.getNextNonBlocked());
@@ -1114,7 +1115,7 @@ public class BarrierBufferTest {
// a simple successful checkpoint
startTs = System.nanoTime();
check(sequence[32], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[33], buffer.getNextNonBlocked());
@@ -1175,7 +1176,7 @@ public class BarrierBufferTest {
// finished first checkpoint
check(sequence[3], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
validateAlignmentTime(startTs, buffer);
check(sequence[5], buffer.getNextNonBlocked());
@@ -1198,7 +1199,7 @@ public class BarrierBufferTest {
assertEquals(0L, buffer.getAlignmentDurationNanos());
// no further checkpoint (abort) notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
// all done
@@ -1280,7 +1281,7 @@ public class BarrierBufferTest {
// checkpoint done
check(sequence[7], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+ verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
// queued data
check(sequence[10], buffer.getNextNonBlocked());
@@ -1299,7 +1300,7 @@ public class BarrierBufferTest {
checkNoTempFilesRemain();
// check overall notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
@@ -1364,7 +1365,7 @@ public class BarrierBufferTest {
// checkpoint finished
check(sequence[7], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
check(sequence[11], buffer.getNextNonBlocked());
// remaining data
@@ -1380,7 +1381,7 @@ public class BarrierBufferTest {
checkNoTempFilesRemain();
// check overall notifications
- verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
@@ -1389,7 +1390,7 @@ public class BarrierBufferTest {
// ------------------------------------------------------------------------
private static BufferOrEvent createBarrier(long checkpointId, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel);
+ return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
}
private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
@@ -1487,12 +1488,12 @@ public class BarrierBufferTest {
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
assertTrue("wrong checkpoint id",
nextExpectedCheckpointId == -1L ||
nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index da322f6..05f7da6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -470,7 +471,7 @@ public class BarrierTrackerTest {
// ------------------------------------------------------------------------
private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
}
private static BufferOrEvent createCancellationBarrier(long id, int channel) {
@@ -502,12 +503,12 @@ public class BarrierTrackerTest {
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
throw new UnsupportedOperationException("should never be called");
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
final long expectedId = checkpointIDs[i++];
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5c0f0cf..51294ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -174,6 +175,11 @@ public class BlockingCheckpointsTest {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID, String operatorIdentifier,
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
@@ -276,7 +282,7 @@ public class BlockingCheckpointsTest {
@Override
protected void run() throws Exception {
- triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics());
+ triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new CheckpointMetrics());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 69c2c88..e22bf86 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -408,7 +409,7 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// These elements should be buffered until we receive barriers from
// all inputs
@@ -427,14 +428,14 @@ public class OneInputStreamTaskTest extends TestLogger {
// we should not yet see the barrier, only the two elements from non-blocked input
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
// now we should see the barrier and after that the buffered elements
- expectedOutput.add(new CheckpointBarrier(0, 0));
+ expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
@@ -467,7 +468,7 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// These elements should be buffered until we receive barriers from
// all inputs
@@ -488,15 +489,15 @@ public class OneInputStreamTaskTest extends TestLogger {
// Now give a later barrier to all inputs, this should unblock the first channel,
// thereby allowing the two blocked elements through
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
- expectedOutput.add(new CheckpointBarrier(1, 1));
+ expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
testHarness.waitForInputProcessing();
@@ -504,9 +505,9 @@ public class OneInputStreamTaskTest extends TestLogger {
// Then give the earlier barrier, these should be ignored
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
@@ -557,7 +558,7 @@ public class OneInputStreamTaskTest extends TestLogger {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
- while(!streamTask.triggerCheckpoint(checkpointMetaData));
+ while(!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()));
// since no state was set, there shouldn't be restore calls
assertEquals(0, TestingStreamOperator.numberRestoreCalls);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0773699..1a6fa8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -231,7 +232,7 @@ public class SourceStreamTaskTest {
for (int i = 0; i < numCheckpoints; i++) {
long currentCheckpointId = checkpointId.getAndIncrement();
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
- sourceTask.triggerCheckpoint(checkpointMetaData);
+ sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
Thread.sleep(checkpointInterval);
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index c2d4aaa..53f77ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -56,7 +57,8 @@ public class StreamTaskCancellationBarrierTest {
testHarness.invoke();
// tell the task to commence a checkpoint
- boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()));
+ boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()),
+ CheckpointOptions.forFullCheckpoint());
assertFalse("task triggered checkpoint though not ready", result);
// a cancellation barrier should be downstream
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1e74c3e..3d01fdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -305,18 +306,18 @@ public class StreamTaskTest extends TestLogger {
final Exception testException = new Exception("Test exception");
- when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException);
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException);
// mock the returned legacy snapshots
StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
- when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
- when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
- when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+ when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+ when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+ when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
// set up the task
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
try {
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
fail("Expected test exception here.");
} catch (Exception e) {
assertEquals(testException, e.getCause());
@@ -380,18 +381,18 @@ public class StreamTaskTest extends TestLogger {
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
- when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3);
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
// mock the legacy state snapshot
StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
- when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
- when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
- when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+ when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+ when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+ when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
@@ -405,7 +406,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
@@ -468,7 +469,7 @@ public class StreamTaskTest extends TestLogger {
new DoneFuture<>(managedOperatorStateHandle),
new DoneFuture<>(rawOperatorStateHandle));
- when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
StreamOperator<?>[] streamOperators = {streamOperator};
@@ -495,7 +496,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
acknowledgeCheckpointLatch.await();
@@ -584,7 +585,7 @@ public class StreamTaskTest extends TestLogger {
new DoneFuture<>(managedOperatorStateHandle),
new DoneFuture<>(rawOperatorStateHandle));
- when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
StreamOperator<?>[] streamOperators = {streamOperator};
@@ -613,7 +614,7 @@ public class StreamTaskTest extends TestLogger {
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
- streamTask.triggerCheckpoint(checkpointMetaData);
+ streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
createSubtask.await();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index c0a1638..d465619 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -225,7 +226,7 @@ public class TwoInputStreamTaskTest {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// This element should be buffered since we received a checkpoint barrier on
// this input
@@ -262,16 +263,16 @@ public class TwoInputStreamTaskTest {
expectedOutput,
testHarness.getOutput());
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
testHarness.endInput();
testHarness.waitForTaskCompletion();
// now we should see the barrier and after that the buffered elements
- expectedOutput.add(new CheckpointBarrier(0, 0));
+ expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
TestHarnessUtil.assertOutputEquals("Output was not correct.",
@@ -306,7 +307,7 @@ public class TwoInputStreamTaskTest {
testHarness.invoke();
testHarness.waitForTaskRunning();
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
// These elements should be buffered until we receive barriers from
// all inputs
@@ -329,15 +330,15 @@ public class TwoInputStreamTaskTest {
// Now give a later barrier to all inputs, this should unblock the first channel,
// thereby allowing the two blocked elements through
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
- expectedOutput.add(new CheckpointBarrier(1, 1));
+ expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
testHarness.waitForInputProcessing();
@@ -347,9 +348,9 @@ public class TwoInputStreamTaskTest {
// Then give the earlier barrier, these should be ignored
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
testHarness.waitForInputProcessing();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 01afec6..07424f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
@@ -478,11 +479,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
}
/**
- * Calls {@link StreamOperator#snapshotState(long, long)}.
+ * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}.
*/
public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
- OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp);
+ CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op");
+
+ OperatorSnapshotResult operatorStateResult = operator.snapshotState(
+ checkpointId,
+ timestamp,
+ CheckpointOptions.forFullCheckpoint());
KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index cde5780..effb44c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -143,9 +144,11 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
if (keyedStateBackend != null) {
- RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
+ RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
+ checkpointId,
timestamp,
- streamFactory);
+ streamFactory,
+ CheckpointOptions.forFullCheckpoint());
if(!keyedSnapshotRunnable.isDone()) {
Thread runner = new Thread(keyedSnapshotRunnable);
runner.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 128522b..ac37009 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import java.io.FileNotFoundException;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
@@ -160,21 +161,21 @@ public class SavepointITCase extends TestLogger {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
final File checkpointDir = new File(tmpDir, "checkpoints");
- final File savepointDir = new File(tmpDir, "savepoints");
+ final File savepointRootDir = new File(tmpDir, "savepoints");
- if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+ if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
fail("Test setup failed: failed to create temporary directories.");
}
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
- LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
+ LOG.info("Created temporary savepoint directory: " + savepointRootDir + ".");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
- savepointDir.toURI().toString());
+ savepointRootDir.toURI().toString());
LOG.info("Flink configuration: " + config + ".");
@@ -217,14 +218,6 @@ public class SavepointITCase extends TestLogger {
.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
LOG.info("Retrieved savepoint path: " + savepointPath + ".");
- // Only one savepoint should exist
- File[] files = savepointDir.listFiles();
- if (files != null) {
- assertEquals("Savepoint not created in expected directory", 1, files.length);
- } else {
- fail("Savepoint not created in expected directory");
- }
-
// Retrieve the savepoint from the testing job manager
LOG.info("Requesting the savepoint.");
Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
@@ -240,15 +233,33 @@ public class SavepointITCase extends TestLogger {
// - Verification START -------------------------------------------
+ // Only one savepoint should exist
+ File[] files = savepointRootDir.listFiles();
+
+ if (files != null) {
+ assertEquals("Savepoint not created in expected directory", 1, files.length);
+ assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+
+ File savepointDir = files[0];
+ File[] savepointFiles = savepointDir.listFiles();
+ assertNotNull(savepointFiles);
+ assertTrue("Did not write savepoint files to directory",savepointFiles.length > 1);
+ } else {
+ fail("Savepoint not created in expected directory");
+ }
+
// Only one checkpoint of the savepoint should exist
// We currently have the following directory layout: checkpointDir/jobId/chk-ID
- files = checkpointDir.listFiles();
- assertNotNull("Checkpoint directory empty", files);
- assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
- assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
+ File jobCheckpoints = new File(checkpointDir, jobId.toString());
+
+ if (jobCheckpoints.exists()) {
+ files = jobCheckpoints.listFiles();
+ assertNotNull("Checkpoint directory empty", files);
+ assertEquals("Checkpoints directory not cleaned up: " + Arrays.toString(files), 0, files.length);
+ }
// Only one savepoint should exist
- files = savepointDir.listFiles();
+ files = savepointRootDir.listFiles();
assertNotNull("Savepoint directory empty", files);
assertEquals("No savepoint found in savepoint directory", 1, files.length);
@@ -399,8 +410,8 @@ public class SavepointITCase extends TestLogger {
// All savepoints should have been cleaned up
errMsg = "Savepoints directory not cleaned up properly: " +
- Arrays.toString(savepointDir.listFiles()) + ".";
- assertEquals(errMsg, 0, savepointDir.listFiles().length);
+ Arrays.toString(savepointRootDir.listFiles()) + ".";
+ assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
// - Verification END ---------------------------------------------
} finally {
@@ -468,7 +479,7 @@ public class SavepointITCase extends TestLogger {
flink.submitJobAndWait(jobGraph, false);
} catch (Exception e) {
assertEquals(JobExecutionException.class, e.getClass());
- assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+ assertEquals(FileNotFoundException.class, e.getCause().getClass());
}
} finally {
if (flink != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index ec6a8f5..79665dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -86,7 +86,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
}
}
-
public static class FailingStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 1L;
@@ -97,6 +96,12 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
}
@Override
+ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId,
+ String operatorIdentifier, String targetLocation) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
[4/7] flink git commit: [FLINK-5763] [checkpoints] Add
CheckpointOptions
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c2ada3b..d8e46fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -313,8 +313,8 @@ public class CheckpointCoordinatorTest {
assertFalse(checkpoint.isFullyAcknowledged());
// check that the vertices received the trigger checkpoint message
- verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
- verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
+ verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+ verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
@@ -428,14 +428,14 @@ public class CheckpointCoordinatorTest {
// check that the vertices received the trigger checkpoint message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
}
// check that the vertices received the trigger checkpoint message for the second checkpoint
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -529,8 +529,8 @@ public class CheckpointCoordinatorTest {
// check that the vertices received the trigger checkpoint message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
}
// acknowledge from one of the tasks
@@ -558,8 +558,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
}
CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
@@ -589,8 +589,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -660,8 +660,8 @@ public class CheckpointCoordinatorTest {
long checkpointId1 = pending1.getCheckpointId();
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
@@ -687,8 +687,8 @@ public class CheckpointCoordinatorTest {
CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
// we acknowledge the remaining two tasks from the first
// checkpoint and two tasks from the second checkpoint
@@ -794,8 +794,8 @@ public class CheckpointCoordinatorTest {
long checkpointId1 = pending1.getCheckpointId();
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
@@ -819,8 +819,8 @@ public class CheckpointCoordinatorTest {
long checkpointId2 = pending2.getCheckpointId();
// trigger messages should have been sent
- verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
- verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+ verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+ verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
// we acknowledge one more task from the first checkpoint and the second
// checkpoint completely. The second checkpoint should then subsume the first checkpoint
@@ -1142,7 +1142,7 @@ public class CheckpointCoordinatorTest {
numCalls.incrementAndGet();
return null;
}
- }).when(execution).triggerCheckpoint(anyLong(), anyLong());
+ }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
@@ -1232,7 +1232,7 @@ public class CheckpointCoordinatorTest {
triggerCalls.add((Long) invocation.getArguments()[0]);
return null;
}
- }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong());
+ }).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
final long delay = 50;
@@ -1398,7 +1398,6 @@ public class CheckpointCoordinatorTest {
assertFalse(savepointFuture.isDone());
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
@@ -1414,8 +1413,8 @@ public class CheckpointCoordinatorTest {
// validate that the relevant tasks got a confirmation message
{
- verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
- verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+ verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+ verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -1537,7 +1536,7 @@ public class CheckpointCoordinatorTest {
numCalls.incrementAndGet();
return null;
}
- }).when(execution).triggerCheckpoint(anyLong(), anyLong());
+ }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
doAnswer(new Answer<Void>() {
@Override
@@ -1578,7 +1577,7 @@ public class CheckpointCoordinatorTest {
assertEquals(maxConcurrentAttempts, numCalls.get());
verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
- .triggerCheckpoint(anyLong(), anyLong());
+ .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
// now, once we acknowledge one checkpoint, it should trigger the next one
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
new file mode 100644
index 0000000..6788338
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.junit.Test;
+
+public class CheckpointOptionsTest {
+
+ @Test
+ public void testFullCheckpoint() throws Exception {
+ CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
+ assertEquals(CheckpointType.FULL_CHECKPOINT, options.getCheckpointType());
+ assertNull(options.getTargetLocation());
+ }
+
+ @Test
+ public void testSavepoint() throws Exception {
+ String location = "asdasdadasdasdja7931481398123123123kjhasdkajsd";
+ CheckpointOptions options = CheckpointOptions.forSavepoint(location);
+ assertEquals(CheckpointType.SAVEPOINT, options.getCheckpointType());
+ assertEquals(location, options.getTargetLocation());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testSavepointNullCheck() throws Exception {
+ CheckpointOptions.forSavepoint(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 3c373f1..95a31d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -184,6 +184,7 @@ public class CheckpointStatsHistoryTest {
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
when(completed.getCheckpointId()).thenReturn(checkpointId);
+ when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
return completed;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 512768d..6ab8620 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -183,7 +183,7 @@ public class MigrationV0ToV1Test {
} finally {
// Dispose
- SavepointStore.removeSavepoint(path.toString());
+ SavepointStore.removeSavepointFile(path.toString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6471d6f..c66b29d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -64,12 +64,12 @@ public class SavepointLoaderTest {
Map<JobVertexID, TaskState> taskStates = new HashMap<>();
taskStates.put(vertexId, state);
+ JobID jobId = new JobID();
+
// Store savepoint
SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values());
String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
- JobID jobId = new JobID();
-
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
when(vertex.getParallelism()).thenReturn(parallelism);
when(vertex.getMaxParallelism()).thenReturn(parallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 3398341..dc19e47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import java.io.File;
+import java.util.Arrays;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -38,6 +41,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -54,14 +58,22 @@ public class SavepointStoreTest {
*/
@Test
public void testStoreLoadDispose() throws Exception {
- String target = tmp.getRoot().getAbsolutePath();
+ String root = tmp.getRoot().getAbsolutePath();
+ File rootFile = new File(root);
- assertEquals(0, tmp.getRoot().listFiles().length);
+ File[] list = rootFile.listFiles();
+
+ assertNotNull(list);
+ assertEquals(0, list.length);
// Store
+ String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
- String path = SavepointStore.storeSavepoint(target, stored);
- assertEquals(1, tmp.getRoot().listFiles().length);
+ String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
+
+ list = rootFile.listFiles();
+ assertNotNull(list);
+ assertEquals(1, list.length);
// Load
Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
@@ -70,9 +82,11 @@ public class SavepointStoreTest {
loaded.dispose();
// Dispose
- SavepointStore.removeSavepoint(path);
+ SavepointStore.deleteSavepointDirectory(path);
- assertEquals(0, tmp.getRoot().listFiles().length);
+ list = rootFile.listFiles();
+ assertNotNull(list);
+ assertEquals(0, list.length);
}
/**
@@ -108,8 +122,8 @@ public class SavepointStoreTest {
assertTrue(serializers.size() >= 1);
- String target = tmp.getRoot().getAbsolutePath();
- assertEquals(0, tmp.getRoot().listFiles().length);
+ String root = tmp.getRoot().getAbsolutePath();
+ File rootFile = new File(root);
// New savepoint type for test
int version = ThreadLocalRandom.current().nextInt();
@@ -118,14 +132,24 @@ public class SavepointStoreTest {
// Add serializer
serializers.put(version, NewSavepointSerializer.INSTANCE);
+ String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID());
TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
- String pathNewSavepoint = SavepointStore.storeSavepoint(target, newSavepoint);
- assertEquals(1, tmp.getRoot().listFiles().length);
+ String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
+
+ File[] list = rootFile.listFiles();
+
+ assertNotNull(list);
+ assertEquals(1, list.length);
// Savepoint v0
+ String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
- String pathSavepoint = SavepointStore.storeSavepoint(target, savepoint);
- assertEquals(2, tmp.getRoot().listFiles().length);
+ String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
+
+ list = rootFile.listFiles();
+
+ assertNotNull(list);
+ assertEquals(2, list.length);
// Load
Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
new file mode 100644
index 0000000..dd5b0b6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class CheckpointBarrierTest {
+
+ /**
+ * Test serialization of the checkpoint barrier.
+ */
+ @Test
+ public void testSerialization() throws Exception {
+ long id = Integer.MAX_VALUE + 123123L;
+ long timestamp = Integer.MAX_VALUE + 1228L;
+
+ CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+ testSerialization(id, timestamp, checkpoint);
+
+ CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+ testSerialization(id, timestamp, savepoint);
+ }
+
+ private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+
+ DataOutputSerializer out = new DataOutputSerializer(1024);
+ barrier.write(out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+ CheckpointBarrier deserialized = new CheckpointBarrier();
+ deserialized.read(in);
+
+ assertEquals(id, deserialized.getId());
+ assertEquals(timestamp, deserialized.getTimestamp());
+ assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+ assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 271d0d2..e674eb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,6 +18,14 @@
package org.apache.flink.runtime.io.network.api.serialization;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -25,25 +33,42 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-
import org.junit.Test;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+public class EventSerializerTest {
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+ @Test
+ public void testCheckpointBarrierSerialization() throws Exception {
+ long id = Integer.MAX_VALUE + 123123L;
+ long timestamp = Integer.MAX_VALUE + 1228L;
-public class EventSerializerTest {
+ CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+ testCheckpointBarrierSerialization(id, timestamp, checkpoint);
+
+ CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+ testCheckpointBarrierSerialization(id, timestamp, savepoint);
+ }
+
+ private void testCheckpointBarrierSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+ ByteBuffer serialized = EventSerializer.toSerializedEvent(barrier);
+ CheckpointBarrier deserialized = (CheckpointBarrier) EventSerializer.fromSerializedEvent(serialized, cl);
+ assertFalse(serialized.hasRemaining());
+
+ assertEquals(id, deserialized.getId());
+ assertEquals(timestamp, deserialized.getTimestamp());
+ assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+ assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+ }
@Test
public void testSerializeDeserializeEvent() throws Exception {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
- new CheckpointBarrier(1678L, 4623784L),
+ new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
@@ -94,7 +119,7 @@ public class EventSerializerTest {
AbstractEvent[] events = {
EndOfPartitionEvent.INSTANCE,
EndOfSuperstepEvent.INSTANCE,
- new CheckpointBarrier(1678L, 4623784L),
+ new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
new TestTaskEvent(Math.random(), 12361231273L),
new CancelCheckpointMarker(287087987329842L)
};
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63175ed..900b5c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -327,7 +328,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
- CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L);
+ CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forFullCheckpoint());
// No records emitted yet, broadcast should not request a buffer
writer.broadcastEvent(barrier);
@@ -363,7 +364,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
- CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L);
+ CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forFullCheckpoint());
// Emit records on some channels first (requesting buffers), then
// broadcast the event. The record buffers should be emitted first, then
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index de54d1f..5a38be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -601,7 +602,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare(
String.valueOf(UUID.randomUUID()),
InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
@@ -619,7 +620,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("should not be called!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index db45231..bc420cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -49,7 +50,7 @@ public class CheckpointMessagesTest {
NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
testSerializabilityEqualsHashCode(cc);
- TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+ TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forFullCheckpoint());
testSerializabilityEqualsHashCode(tc);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5bd085f..94df524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Test;
@@ -165,7 +166,7 @@ public class OperatorStateBackendTest {
listState3.add(20);
CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
- OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory).get();
+ OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b0350d..f2416b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -191,7 +192,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -202,7 +203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -403,7 +404,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals(13, (int) state2.value());
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
backend = restoreKeyedBackend(
@@ -476,7 +477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals(42L, (long) state.value());
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
@@ -521,7 +522,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -532,7 +533,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -620,7 +621,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -631,7 +632,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("u3");
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -722,7 +723,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -734,7 +735,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add(103);
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -829,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
// make some more modifications
backend.setCurrentKey(1);
@@ -841,7 +842,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
// draw another snapshot
- KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
// validate the original state
backend.setCurrentKey(1);
@@ -1163,7 +1164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("ShouldBeInSecondHalf");
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory));
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles(
Collections.singletonList(snapshot),
@@ -1230,7 +1231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.update("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1281,7 +1282,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1334,7 +1335,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.add("2");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1385,7 +1386,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
state.put("2", "Second");
// draw a snapshot
- KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+ KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
// restore the first snapshot and validate it
@@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
backend.dispose();
@@ -1692,7 +1693,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
// draw a snapshot
- KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
+ KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
assertNull(snapshot);
backend.dispose();
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
new file mode 100644
index 0000000..a29d29c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FsSavepointStreamFactoryTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ /**
+ * Tests that the factory creates all files in the given directory without
+ * creating any sub directories.
+ */
+ @Test
+ public void testSavepointStreamDirectoryLayout() throws Exception {
+ File testRoot = folder.newFolder();
+ JobID jobId = new JobID();
+
+ FsSavepointStreamFactory savepointStreamFactory = new FsSavepointStreamFactory(
+ new Path(testRoot.getAbsolutePath()),
+ jobId,
+ 0);
+
+ File[] listed = testRoot.listFiles();
+ assertNotNull(listed);
+ assertEquals(0, listed.length);
+
+ FsCheckpointStateOutputStream stream = savepointStreamFactory
+ .createCheckpointStateOutputStream(1273, 19231);
+
+ stream.write(1);
+
+ FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+ listed = testRoot.listFiles();
+ assertNotNull(listed);
+ assertEquals(1, listed.length);
+ assertEquals(handle.getFilePath().getPath(), listed[0].getPath());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 187163d..89ae5da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -91,7 +92,7 @@ public class TaskAsyncCallTest {
awaitLatch.await();
for (int i = 1; i <= NUM_CALLS; i++) {
- task.triggerCheckpointBarrier(i, 156865867234L);
+ task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
}
triggerLatch.await();
@@ -121,7 +122,7 @@ public class TaskAsyncCallTest {
awaitLatch.await();
for (int i = 1; i <= NUM_CALLS; i++) {
- task.triggerCheckpointBarrier(i, 156865867234L);
+ task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
task.notifyCheckpointComplete(i);
}
@@ -226,7 +227,7 @@ public class TaskAsyncCallTest {
public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
lastCheckpointId++;
if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
if (lastCheckpointId == NUM_CALLS) {
@@ -243,7 +244,7 @@ public class TaskAsyncCallTest {
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
throw new UnsupportedOperationException("Should not be called");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 144247f..05fda28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators;
+import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.flink.annotation.PublicEvolving;
@@ -36,6 +37,8 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -340,17 +343,19 @@ public abstract class AbstractStreamOperator<OUT>
}
@Override
- public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception {
+ public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
+ CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
+
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
- checkpointStreamFactory,
+ factory,
keyGroupRange,
getContainingTask().getCancelables())) {
@@ -361,12 +366,12 @@ public abstract class AbstractStreamOperator<OUT>
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
- operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+ operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
- keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+ keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
@@ -431,11 +436,12 @@ public abstract class AbstractStreamOperator<OUT>
@SuppressWarnings("deprecation")
@Deprecated
@Override
- public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+ public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
if (this instanceof StreamCheckpointedOperator) {
+ CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
- checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ factory.createCheckpointStateOutputStream(checkpointId, timestamp);
getContainingTask().getCancelables().registerClosable(outStream);
@@ -495,6 +501,31 @@ public abstract class AbstractStreamOperator<OUT>
@Override
public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {}
+ /**
+ * Returns a checkpoint stream factory for the provided options.
+ *
+ * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared
+ * factory of this operator.
+ *
+ * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory per
+ * savepoint.
+ *
+ * @param checkpointOptions Options for the checkpoint
+ * @return Checkpoint stream factory for the checkpoints
+ * @throws IOException Failures while creating a new stream factory are forwarded
+ */
+ @VisibleForTesting
+ CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException {
+ CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+ if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+ return checkpointStreamFactory;
+ } else if (checkpointType == CheckpointType.SAVEPOINT) {
+ return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation());
+ } else {
+ throw new IllegalStateException("Unknown checkpoint type " + checkpointType);
+ }
+ }
+
// ------------------------------------------------------------------------
// Properties and Services
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 5a6c37b..83697ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.ExceptionUtils;
import java.util.concurrent.RunnableFuture;
/**
- * Result of {@link AbstractStreamOperator#snapshotState}.
+ * Result of {@link StreamOperator#snapshotState}.
*/
public class OperatorSnapshotResult {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d8e4d08..006e910 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -98,7 +99,10 @@ public interface StreamOperator<OUT> extends Serializable {
*
* @throws Exception exception that happened during snapshotting.
*/
- OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception;
+ OperatorSnapshotResult snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) throws Exception;
/**
* Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}.
@@ -110,7 +114,10 @@ public interface StreamOperator<OUT> extends Serializable {
*/
@SuppressWarnings("deprecation")
@Deprecated
- StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception;
+ StreamStateHandle snapshotLegacyOperatorState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions) throws Exception;
/**
* Provides state handles to restore the operator state.
@@ -142,4 +149,5 @@ public interface StreamOperator<OUT> extends Serializable {
void setChainingStrategy(ChainingStrategy strategy);
MetricGroup getMetricGroup();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 611bd44..2da8389 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -368,7 +368,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
.setBytesBufferedInAlignment(bytesBuffered)
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ checkpointMetaData,
+ checkpointBarrier.getCheckpointOptions(),
+ checkpointMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 77608c6..8b1b65b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -132,7 +133,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
+ notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
return;
}
@@ -170,7 +171,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
LOG.debug("Received all barriers for checkpoint {}", barrierId);
}
- notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+ notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
}
}
}
@@ -248,14 +249,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
- private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+ private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
if (toNotifyOnCheckpoint != null) {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4f07182..dd93592 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -164,9 +165,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
- public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+ public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
try {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 62cfb8f..938ffd2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
@@ -163,7 +164,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private TaskStateHandles restoreStateHandles;
-
/** The currently active background materialization threads */
private final CloseableRegistry cancelables = new CloseableRegistry();
@@ -520,14 +520,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
try {
// No alignment if we inject a checkpoint
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
.setBytesBufferedInAlignment(0L)
.setAlignmentDurationNanos(0L);
- return performCheckpoint(checkpointMetaData, checkpointMetrics);
+ return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
catch (Exception e) {
// propagate exceptions only if the task is still in "running" state
@@ -543,9 +543,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
@Override
- public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+ public void triggerCheckpointOnBarrier(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) throws Exception {
+
try {
- performCheckpoint(checkpointMetaData, checkpointMetrics);
+ performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
}
catch (CancelTaskException e) {
throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
@@ -570,8 +574,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
- LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName());
+ private boolean performCheckpoint(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) throws Exception {
+
+ LOG.debug("Starting checkpoint ({}) {} on task {}",
+ checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
synchronized (lock) {
if (isRunning) {
@@ -582,9 +591,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
// can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(
- checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
+ checkpointMetaData.getCheckpointId(),
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions);
- checkpointState(checkpointMetaData, checkpointMetrics);
+ checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
else {
@@ -637,8 +648,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
- private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
- CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
+ private void checkpointState(
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) throws Exception {
+
+ CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
+ this,
+ checkpointMetaData,
+ checkpointOptions,
+ checkpointMetrics);
+
checkpointingOperation.executeCheckpointing();
}
@@ -814,7 +834,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return stateBackend.createStreamFactory(
getEnvironment().getJobID(),
createOperatorIdentifier(operator, configuration.getVertexID()));
+ }
+ public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
+ return stateBackend.createSavepointStreamFactory(
+ getEnvironment().getJobID(),
+ createOperatorIdentifier(operator, configuration.getVertexID()),
+ targetLocation);
}
private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
@@ -1048,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final StreamTask<?, ?> owner;
private final CheckpointMetaData checkpointMetaData;
+ private final CheckpointOptions checkpointOptions;
private final CheckpointMetrics checkpointMetrics;
private final StreamOperator<?>[] allOperators;
@@ -1060,9 +1087,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final List<StreamStateHandle> nonPartitionedStates;
private final List<OperatorSnapshotResult> snapshotInProgressList;
- public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
+ public CheckpointingOperation(
+ StreamTask<?, ?> owner,
+ CheckpointMetaData checkpointMetaData,
+ CheckpointOptions checkpointOptions,
+ CheckpointMetrics checkpointMetrics) {
+
this.owner = Preconditions.checkNotNull(owner);
this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+ this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
this.allOperators = owner.operatorChain.getAllOperators();
this.nonPartitionedStates = new ArrayList<>(allOperators.length);
@@ -1137,14 +1170,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
@SuppressWarnings("deprecation")
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
- // first call the legacy checkpoint code paths
+ // first call the legacy checkpoint code paths
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getTimestamp()));
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions));
OperatorSnapshotResult snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getTimestamp());
+ checkpointMetaData.getTimestamp(),
+ checkpointOptions);
snapshotInProgressList.add(snapshotInProgress);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 6751617..51b9d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -100,4 +100,4 @@ public class ListCheckpointedTest {
return restored;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 274611a..8507200 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -17,12 +17,34 @@
*/
package org.apache.flink.streaming.api.operators;
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -45,27 +67,6 @@ import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RunnableFuture;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
/**
* Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
* tests timers and state and whether they are correctly checkpointed/restored
@@ -495,10 +496,10 @@ public class AbstractStreamOperatorTest {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
- operator.snapshotState(checkpointId, timestamp);
+ operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
verify(context).close();
}
@@ -524,14 +525,14 @@ public class AbstractStreamOperatorTest {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
// lets fail when calling the actual snapshotState method
doThrow(failingException).when(operator).snapshotState(eq(context));
try {
- operator.snapshotState(checkpointId, timestamp);
+ operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
@@ -571,23 +572,29 @@ public class AbstractStreamOperatorTest {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+
+ // The amount of mocking in this test makes it necessary to make the
+ // getCheckpointStreamFactory method visible for the test and to
+ // overwrite its behaviour.
+ when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory);
+
doReturn(containingTask).when(operator).getContainingTask();
RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
- when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+ when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
- when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+ when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException);
Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
try {
- operator.snapshotState(checkpointId, timestamp);
+ operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index c4ddea8..d331171 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -90,8 +91,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
"org.apache.flink.streaming.api.operators.Output], " +
- "snapshotLegacyOperatorState[long, long], " +
- "snapshotState[long, long]]";
+ "snapshotLegacyOperatorState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions], " +
+ "snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -240,7 +241,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
try {
runStarted.await();
if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
- new CheckpointMetaData(0, System.currentTimeMillis()))) {
+ new CheckpointMetaData(0, System.currentTimeMillis()),
+ CheckpointOptions.forFullCheckpoint())) {
LifecycleTrackingStreamSource.runFinish.trigger();
}
} catch (Exception e) {
@@ -260,9 +262,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
}
@Override
- public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+ public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
- return super.snapshotLegacyOperatorState(checkpointId, timestamp);
+ return super.snapshotLegacyOperatorState(checkpointId, timestamp, checkpointOptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index b1689f9..ab4258f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -184,4 +184,4 @@ public class WrappingFunctionSnapshotRestoreTest {
return value;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 907f8f1..c4867ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -522,7 +523,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
- task.triggerCheckpoint(checkpointMetaData);
+ task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
env.getCheckpointLatch().await();
@@ -557,7 +558,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));
// trigger the checkpoint while processing stream elements
- restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp));
+ restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forFullCheckpoint());
restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
[2/7] flink git commit: [FLINK-5763] [checkpoints] Followup on adding
CheckpointOptions
Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Followup on adding CheckpointOptions
- Add a test that validates the checkpoint type ordinals are not changed
- Change target location writing from 'writeUtf' to 'StringUtils.write'.
- Pull out the coding charset as a constant in 'EventSerializer'
- Simplify the directory creation in 'SavepointStore'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df16e50b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df16e50b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df16e50b
Branch: refs/heads/master
Commit: df16e50bbf01d26f75b7745dacd5779ad47dcce5
Parents: 6e7a917
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 14:11:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100
----------------------------------------------------------------------
.../flink/core/io/IOReadableWritable.java | 7 +-
.../java/org/apache/flink/util/StringUtils.java | 47 +++++++++++--
.../checkpoint/savepoint/SavepointStore.java | 70 ++++++++++----------
.../io/network/api/CheckpointBarrier.java | 16 +++--
.../api/serialization/EventSerializer.java | 17 ++---
.../runtime/util/DataInputDeserializer.java | 6 +-
.../runtime/checkpoint/CheckpointTypeTest.java | 42 ++++++++++++
.../io/network/api/CheckpointBarrierTest.java | 2 +-
8 files changed, 145 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index a192a21..a38952e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -27,9 +27,10 @@ import org.apache.flink.core.memory.DataOutputView;
/**
* This interface must be implemented by every class whose objects have to be serialized to their binary representation
* and vice-versa. In particular, records have to implement this interface in order to specify how their data can be
- * transfered
- * to a binary representation.
- * When implementing this Interface make sure that the implementing class has a default (zero-argument) constructor!
+ * transferred to a binary representation.
+ *
+ * <p>When implementing this Interface make sure that the implementing class has a default
+ * (zero-argument) constructor!
*/
@Public
public interface IOReadableWritable {
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 3c32d77..fc945c6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -27,6 +27,11 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Utility class to convert objects into strings in vice-versa.
*/
@@ -302,19 +307,46 @@ public final class StringUtils {
}
return new String(data);
}
-
+
+ /**
+ * Writes a String to the given output.
+ * The written string can be read with {@link #readNullableString(DataInputView)}.
+ *
+ * @param str The string to write
+ * @param out The output to write to
+ *
+ * @throws IOException Thrown, if the writing or the serialization fails.
+ */
+ public static void writeString(@Nonnull String str, DataOutputView out) throws IOException {
+ checkNotNull(str);
+ StringValue.writeString(str, out);
+ }
+
+ /**
+ * Reads a non-null String from the given input.
+ *
+ * @param in The input to read from
+ * @return The deserialized String
+ *
+ * @throws IOException Thrown, if the reading or the deserialization fails.
+ */
+ public static String readString(DataInputView in) throws IOException {
+ return StringValue.readString(in);
+ }
+
/**
* Writes a String to the given output. The string may be null.
* The written string can be read with {@link #readNullableString(DataInputView)}-
*
* @param str The string to write, or null.
* @param out The output to write to.
- * @throws IOException Throws if the writing or the serialization fails.
+ *
+ * @throws IOException Thrown, if the writing or the serialization fails.
*/
- public static void writeNullableString(String str, DataOutputView out) throws IOException {
+ public static void writeNullableString(@Nullable String str, DataOutputView out) throws IOException {
if (str != null) {
out.writeBoolean(true);
- StringValue.writeString(str, out);
+ writeString(str, out);
} else {
out.writeBoolean(false);
}
@@ -326,11 +358,12 @@ public final class StringUtils {
*
* @param in The input to read from.
* @return The deserialized string, or null.
- * @throws IOException Throws if the reading or the deserialization fails.
+ *
+ * @throws IOException Thrown, if the reading or the deserialization fails.
*/
- public static String readNullableString(DataInputView in) throws IOException {
+ public static @Nullable String readNullableString(DataInputView in) throws IOException {
if (in.readBoolean()) {
- return StringValue.readString(in);
+ return readString(in);
} else {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 0caf5b2..95370a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,24 +18,27 @@
package org.apache.flink.runtime.checkpoint.savepoint;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Utilities for storing and loading savepoint meta data files.
*
@@ -65,7 +68,10 @@ public class SavepointStore {
* @throws IOException FileSystem operation failures are forwarded
*/
public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
- String prefix;
+ final Path basePath = new Path(baseDirectory);
+ final FileSystem fs = basePath.getFileSystem();
+
+ final String prefix;
if (jobId == null) {
prefix = "savepoint-";
} else {
@@ -73,33 +79,21 @@ public class SavepointStore {
}
Exception latestException = null;
- Path savepointDirectory = null;
-
- FileSystem fs = null;
// Try to create a FS output stream
for (int attempt = 0; attempt < 10; attempt++) {
- Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
-
- if (fs == null) {
- fs = FileSystem.get(path.toUri());
- }
+ Path path = new Path(basePath, FileUtils.getRandomFilename(prefix));
try {
if (fs.mkdirs(path)) {
- savepointDirectory = path;
- break;
+ return path.toString();
}
} catch (Exception e) {
latestException = e;
}
}
- if (savepointDirectory == null) {
- throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
- } else {
- return savepointDirectory.getPath();
- }
+ throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
}
/**
@@ -121,20 +115,22 @@ public class SavepointStore {
* @param directory Target directory to store savepoint in
* @param savepoint Savepoint to be stored
* @return Path of stored savepoint
- * @throws Exception Failures during store are forwarded
+ * @throws IOException Failures during store are forwarded
*/
public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
checkNotNull(directory, "Target directory");
checkNotNull(savepoint, "Savepoint");
- Path basePath = new Path(directory);
- FileSystem fs = FileSystem.get(basePath.toUri());
+ final Path basePath = new Path(directory);
+ final Path metadataFilePath = new Path(basePath, META_DATA_FILE);
- Path path = new Path(basePath, META_DATA_FILE);
- FSDataOutputStream fdos = fs.create(path, false);
+ final FileSystem fs = FileSystem.get(basePath.toUri());
boolean success = false;
- try (DataOutputStream dos = new DataOutputStream(fdos)) {
+ try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE);
+ DataOutputStream dos = new DataOutputStream(fdos))
+ {
+
// Write header
dos.writeInt(MAGIC_NUMBER);
dos.writeInt(savepoint.getVersion());
@@ -143,14 +139,18 @@ public class SavepointStore {
SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
serializer.serialize(savepoint, dos);
success = true;
- } finally {
- if (!success && fs.exists(path)) {
- if (!fs.delete(path, true)) {
- LOG.warn("Failed to delete file {} after failed write.", path);
+ }
+ finally {
+ if (!success && fs.exists(metadataFilePath)) {
+ if (!fs.delete(metadataFilePath, true)) {
+ LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath);
}
}
}
+ // we return the savepoint directory path here!
+ // The directory path also works to resume from and is more elegant than the direct
+ // metadata file pointer
return basePath.toString();
}
@@ -159,7 +159,7 @@ public class SavepointStore {
*
* @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
* @return The loaded savepoint
- * @throws Exception Failures during load are forwared
+ * @throws IOException Failures during load are forwarded
*/
public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
@@ -207,7 +207,7 @@ public class SavepointStore {
* Removes the savepoint meta data w/o loading and disposing it.
*
* @param path Path of savepoint to remove
- * @throws Exception Failures during disposal are forwarded
+ * @throws IOException Failures during disposal are forwarded
*/
public static void removeSavepointFile(String path) throws IOException {
Preconditions.checkNotNull(path, "Path");
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 0752897..a42c25d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api;
import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
import java.io.IOException;
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.event.RuntimeEvent;
+import org.apache.flink.util.StringUtils;
/**
* Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -37,12 +39,12 @@ import org.apache.flink.runtime.event.RuntimeEvent;
*
* <p>Once an operator has received a checkpoint barrier from all its input channels, it
* knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
- * behavior and broadcast the barrier to downstream operators.</p>
+ * behavior and broadcast the barrier to downstream operators.
*
* <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
- * is complete (exactly once)</p>
+ * is complete (exactly once).
*
- * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.
*/
public class CheckpointBarrier extends RuntimeEvent {
@@ -86,8 +88,8 @@ public class CheckpointBarrier extends RuntimeEvent {
return;
} else if (checkpointType == CheckpointType.SAVEPOINT) {
String targetLocation = checkpointOptions.getTargetLocation();
- assert(targetLocation != null);
- out.writeUTF(targetLocation);
+ checkState(targetLocation != null);
+ StringUtils.writeString(targetLocation, out);
} else {
throw new IOException("Unknown CheckpointType " + checkpointType);
}
@@ -99,13 +101,13 @@ public class CheckpointBarrier extends RuntimeEvent {
timestamp = in.readLong();
int typeOrdinal = in.readInt();
- checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+ checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
checkpointOptions = CheckpointOptions.forFullCheckpoint();
} else if (checkpointType == CheckpointType.SAVEPOINT) {
- String targetLocation = in.readUTF();
+ String targetLocation = StringUtils.readString(in);
checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
} else {
throw new IOException("Illegal CheckpointType " + checkpointType);
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 223cbfe..3adf864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -44,6 +44,8 @@ import org.apache.flink.util.Preconditions;
*/
public class EventSerializer {
+ private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8");
+
private static final int END_OF_PARTITION_EVENT = 0;
private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -77,16 +79,16 @@ public class EventSerializer {
} else if (checkpointType == CheckpointType.SAVEPOINT) {
String targetLocation = checkpointOptions.getTargetLocation();
assert(targetLocation != null);
- byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+ byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET);
- buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+ buf = ByteBuffer.allocate(24 + 4 + locationBytes.length);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
buf.putLong(12, barrier.getTimestamp());
buf.putInt(20, checkpointType.ordinal());
- buf.putInt(24, bytes.length);
- for (int i = 0; i < bytes.length; i++) {
- buf.put(28 + i, bytes[i]);
+ buf.putInt(24, locationBytes.length);
+ for (int i = 0; i < locationBytes.length; i++) {
+ buf.put(28 + i, locationBytes[i]);
}
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
@@ -204,8 +206,7 @@ public class EventSerializer {
CheckpointOptions checkpointOptions;
int checkpointTypeOrdinal = buffer.getInt();
- Preconditions.checkElementIndex(type, CheckpointType.values().length,
- "Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+ Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
@@ -214,7 +215,7 @@ public class EventSerializer {
int len = buffer.getInt();
byte[] bytes = new byte[len];
buffer.get(bytes);
- String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+ String targetLocation = new String(bytes, STRING_CODING_CHARSET);
checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 0f99496..4e8871a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -45,7 +45,11 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl
// ------------------------------------------------------------------------
public DataInputDeserializer() {}
-
+
+ public DataInputDeserializer(byte[] buffer) {
+ setBuffer(buffer, 0, buffer.length);
+ }
+
public DataInputDeserializer(byte[] buffer, int start, int len) {
setBuffer(buffer, start, len);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
new file mode 100644
index 0000000..dfbde5e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckpointTypeTest {
+
+ /**
+ * This test validates that the order of enumeration constants is not changed, because the
+ * ordinal of that enum is used in serialization.
+ *
+ * <p>It is still possible to edit both the ordinal and this test, but the test adds
+ * a level of safety, and should make developers stumble over this when attempting
+ * to adjust the enumeration.
+ */
+ @Test
+ public void testOrdinalsAreConstant() {
+ assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
+ assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index dd5b0b6..ad9fc16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -49,7 +49,7 @@ public class CheckpointBarrierTest {
DataOutputSerializer out = new DataOutputSerializer(1024);
barrier.write(out);
- DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
CheckpointBarrier deserialized = new CheckpointBarrier();
deserialized.read(in);