You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:29 UTC

[1/7] flink git commit: [hotfix] [docs] Fix JavaDoc errors in 'flink-streaming-java'

Repository: flink
Updated Branches:
  refs/heads/master 30c9e2b68 -> 417597fbf


[hotfix] [docs] Fix JavaDoc errors in 'flink-streaming-java'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f9f38bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f9f38bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f9f38bf

Branch: refs/heads/master
Commit: 1f9f38bf6312529af3ac527bf2f80f2ecee4d62b
Parents: 30c9e2b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 12:50:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 17:44:25 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  4 +--
 .../api/datastream/WindowedStream.java          |  4 +--
 .../environment/StreamExecutionEnvironment.java |  7 +++---
 .../api/functions/async/AsyncFunction.java      | 26 ++++++++++----------
 .../api/operators/StreamSourceContexts.java     |  6 ++---
 .../api/operators/async/AsyncWaitOperator.java  | 11 +++++----
 .../operators/windowing/MergingWindowSet.java   |  2 +-
 7 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index c3c7424..4f4546e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -826,7 +826,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 *
-	 * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction)} instead.
+	 * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction)} instead.
 	 */
 	@Deprecated
 	public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function) {
@@ -851,7 +851,7 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 *
-	 * @deprecated Use {@link #fold(R, FoldFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead.
+	 * @deprecated Use {@link #fold(Object, FoldFunction, AllWindowFunction, TypeInformation, TypeInformation)} instead.
 	 */
 	@Deprecated
 	public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, AllWindowFunction<R, R, W> function, TypeInformation<R> resultType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 6809df0..b28434c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1284,7 +1284,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
 	 *
-	 * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction)} instead.
+	 * @deprecated Use {@link #fold(Object, FoldFunction, WindowFunction)} instead.
 	 */
 	@Deprecated
 	public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function) {
@@ -1309,7 +1309,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
 	 *
-	 * @deprecated Use {@link #fold(R, FoldFunction, WindowFunction, TypeInformation, TypeInformation)} instead.
+	 * @deprecated Use {@link #fold(Object, FoldFunction, WindowFunction, TypeInformation, TypeInformation)} instead.
 	 */
 	@Deprecated
 	public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dab0a06..e299e84 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -484,7 +485,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The number of times the system will try to re-execute failed tasks.
 	 *
 	 * @deprecated This method will be replaced by {@link #setRestartStrategy}. The
-	 * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
+	 * {@link RestartStrategies#fixedDelayRestart(int, Time)} contains the number of
 	 * execution retries.
 	 */
 	@Deprecated
@@ -500,9 +501,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return The number of times the system will try to re-execute failed tasks.
 	 *
-	 * @deprecated This method will be replaced by {@link #getRestartStrategy}. The
-	 * {@link RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
-	 * execution retries.
+	 * @deprecated This method will be replaced by {@link #getRestartStrategy}.
 	 */
 	@Deprecated
 	@PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index 4de2db1..419c3ee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -26,34 +26,34 @@ import java.io.Serializable;
 
 /**
  * A function to trigger Async I/O operation.
- * <p>
- * For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
+ * 
+ * <p>For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
  * the result can be collected by calling {@link AsyncCollector#collect}. For each async
  * operation, its context is stored in the operator immediately after invoking
  * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
- * <p>
- * {@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
+ * 
+ * <p>{@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
  * An error can also be propagate to the async IO operator by
  * {@link AsyncCollector#collect(Throwable)}.
  *
- * <p>
- * Callback example usage:
+ * <p>Callback example usage:
+ * 
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
- *   @Override
+ * 
  *   public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception {
  *     HBaseCallback cb = new HBaseCallback(collector);
  *     Get get = new Get(Bytes.toBytes(row));
  *     hbase.asyncGet(get, cb);
  *   }
  * }
- * </pre>
+ * }</pre>
  *
- * <p>
- * Future example usage:
+ * <p>Future example usage:
+ * 
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
- *   @Override
+ * 
  *   public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception {
  *     Get get = new Get(Bytes.toBytes(row));
  *     ListenableFuture<Result> future = hbase.asyncGet(get);
@@ -68,14 +68,14 @@ import java.io.Serializable;
  *     });
  *   }
  * }
- * </pre>
+ * }</pre>
  *
  * @param <IN> The type of the input elements.
  * @param <OUT> The type of the returned elements.
  */
-
 @PublicEvolving
 public interface AsyncFunction<IN, OUT> extends Function, Serializable {
+
 	/**
 	 * Trigger async operation for each stream input.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 98281c4..e4d051c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -38,9 +38,9 @@ public class StreamSourceContexts {
 	 * Depending on the {@link TimeCharacteristic}, this method will return the adequate
 	 * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is:
 	 * <ul>
-	 *     <li>{@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}</li>
-	 *     <li>{@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}</li>
-	 *     <li>{@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}</li>
+	 *     <li>{@link TimeCharacteristic#IngestionTime} = {@code AutomaticWatermarkContext}</li>
+	 *     <li>{@link TimeCharacteristic#ProcessingTime} = {@code NonTimestampContext}</li>
+	 *     <li>{@link TimeCharacteristic#EventTime} = {@code ManualWatermarkContext}</li>
 	 * </ul>
 	 * */
 	public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a70d825..4cf79b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -61,13 +62,13 @@ import java.util.concurrent.TimeoutException;
  * Within the async function, the user can complete the async collector arbitrarily. Once the async
  * collector has been completed, the result is emitted by the operator's emitter to downstream
  * operators.
- * <p>
- * The operator offers different output modes depending on the chosen
- * {@link AsyncDataStream.OutputMode}. In order to give exactly once processing guarantees, the
+ * 
+ * <p>The operator offers different output modes depending on the chosen
+ * {@link OutputMode}. In order to give exactly once processing guarantees, the
  * operator stores all currently in-flight {@link StreamElement} in it's operator state. Upon
  * recovery the recorded set of stream elements is replayed.
- * <p>
- * In case of chaining of this operator, it has to be made sure that the operators in the chain are
+ * 
+ * <p>In case of chaining of this operator, it has to be made sure that the operators in the chain are
  * opened tail to head. The reason for this is that an opened {@link AsyncWaitOperator} starts
  * already emitting recovered {@link StreamElement} to downstream operators.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/1f9f38bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
index b79a3fa..de37cb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -42,7 +42,7 @@ import java.util.Map;
  *
  * <p>A new window can be added to the set of in-flight windows using
  * {@link #addWindow(Window, MergeFunction)}. This might merge other windows and the caller
- * must react accordingly in the {@link MergeFunction#merge(Object, Collection, Object, Collection)
+ * must react accordingly in the {@link MergeFunction#merge(Object, Collection, Object, Collection)}
  * and adjust the outside view of windows and state.
  *
  * <p>Windows can be removed from the set of windows using {@link #retireWindow(Window)}.


[7/7] flink git commit: [hotfix] [checkpoints] Remove equals()/hashCode() from CompletedCheckpoint as semantic equality is not well defined.

Posted by se...@apache.org.
[hotfix] [checkpoints] Remove equals()/hashCode() from CompletedCheckpoint as semantic equality is not well defined.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/417597fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/417597fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/417597fb

Branch: refs/heads/master
Commit: 417597fbf71ac9062bed1abf04139d46ec830ec4
Parents: 8ffe75a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 22:19:11 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:50 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java | 24 +-------------------
 1 file changed, 1 insertion(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/417597fb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 53d888e..db86484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -23,13 +23,13 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -206,29 +206,7 @@ public class CompletedCheckpoint implements Serializable {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof CompletedCheckpoint) {
-			CompletedCheckpoint other = (CompletedCheckpoint) obj;
-
-			return job.equals(other.job) && checkpointID == other.checkpointID &&
-				timestamp == other.timestamp && duration == other.duration &&
-				taskStates.equals(other.taskStates);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return (int) (this.checkpointID ^ this.checkpointID >>> 32) +
-			31 * ((int) (this.timestamp ^ this.timestamp >>> 32) +
-				31 * ((int) (this.duration ^ this.duration >>> 32) +
-					31 * Objects.hash(job, taskStates)));
-	}
-
-	@Override
 	public String toString() {
 		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
 	}
-
 }


[5/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions

Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Add CheckpointOptions

Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
to barrier injecting tasks) and barriers (flowing inline with the data:

```java
public class CheckpointOptions {

  // Type of checkpoint
  // => FULL_CHECKPOINT
  // => SAVEPOINT
  @NonNull
  CheckpointType getCheckpointType();

  // Custom target location. This is a String, because for future
  // backends it can be a logical location like a DB table.
  @Nullable
  String getTargetLocation();

}
```

This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints).

These options are forwarded via the `StreamTask` to the `StreamOperator`s and
`Snapshotable` backends. The `AbstractStreamOperator` checks the options and
either i) forwards the shared per operator `CheckpointStreamFactory` (as of

For this, the state backends provide the following new method:

```
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```

The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).

We end up with the following directory layout for savepoints:

```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`)
+---------------------------+
  | +---------------------------------------+
  +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
    +---------------------------------------+
       |
       +- _metadata (one per savepoint)
       +- :uuid (one data file per StreamTask)
       +- ...
       +- :uuid
```


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e7a9174
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e7a9174
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e7a9174

Branch: refs/heads/master
Commit: 6e7a91741708a2b167a2bbca5dda5b2059df5e18
Parents: 1f9f38b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Feb 16 17:56:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkITCase.java        |   1 -
 .../state/RocksDBKeyedStateBackend.java         |   5 +-
 .../streaming/state/RocksDBStateBackend.java    |   9 ++
 .../state/RocksDBAsyncSnapshotTest.java         |   8 +-
 .../state/RocksDBStateBackendTest.java          |  15 +-
 .../checkpoint/CheckpointCoordinator.java       |  56 ++++++--
 .../runtime/checkpoint/CheckpointOptions.java   | 108 +++++++++++++++
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |   3 +-
 .../checkpoint/savepoint/SavepointStore.java    | 137 +++++++++++++------
 .../flink/runtime/executiongraph/Execution.java |   6 +-
 .../io/network/api/CheckpointBarrier.java       |  44 +++++-
 .../api/serialization/EventSerializer.java      |  59 +++++++-
 .../runtime/jobgraph/tasks/StatefulTask.java    |   7 +-
 .../slots/ActorTaskManagerGateway.java          |   6 +-
 .../jobmanager/slots/TaskManagerGateway.java    |   5 +-
 .../jobmaster/RpcTaskManagerGateway.java        |   3 +-
 .../messages/checkpoint/TriggerCheckpoint.java  |  19 ++-
 .../state/AbstractKeyedStateBackend.java        |   3 +-
 .../runtime/state/AbstractStateBackend.java     |   8 ++
 .../state/DefaultOperatorStateBackend.java      |   8 +-
 .../flink/runtime/state/Snapshotable.java       |   5 +-
 .../flink/runtime/state/StateBackend.java       |  22 +++
 .../filesystem/FsCheckpointStreamFactory.java   |  21 +--
 .../filesystem/FsSavepointStreamFactory.java    |  58 ++++++++
 .../state/filesystem/FsStateBackend.java        |   9 ++
 .../state/heap/HeapKeyedStateBackend.java       |   4 +-
 .../state/memory/MemoryStateBackend.java        |   9 ++
 .../runtime/taskexecutor/TaskExecutor.java      |   5 +-
 .../taskexecutor/TaskExecutorGateway.java       |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  53 ++++---
 .../checkpoint/CheckpointOptionsTest.java       |  48 +++++++
 .../checkpoint/CheckpointStatsHistoryTest.java  |   1 +
 .../savepoint/MigrationV0ToV1Test.java          |   2 +-
 .../savepoint/SavepointLoaderTest.java          |   4 +-
 .../savepoint/SavepointStoreTest.java           |  48 +++++--
 .../io/network/api/CheckpointBarrierTest.java   |  61 +++++++++
 .../api/serialization/EventSerializerTest.java  |  45 ++++--
 .../io/network/api/writer/RecordWriterTest.java |   5 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../messages/CheckpointMessagesTest.java        |   3 +-
 .../runtime/state/OperatorStateBackendTest.java |   3 +-
 .../runtime/state/StateBackendTestBase.java     |  39 +++---
 .../FsSavepointStreamFactoryTest.java           |  67 +++++++++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   9 +-
 .../api/operators/AbstractStreamOperator.java   |  43 +++++-
 .../api/operators/OperatorSnapshotResult.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |  12 +-
 .../streaming/runtime/io/BarrierBuffer.java     |   5 +-
 .../streaming/runtime/io/BarrierTracker.java    |   9 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |  65 +++++++--
 .../api/checkpoint/ListCheckpointedTest.java    |   2 +-
 .../operators/AbstractStreamOperatorTest.java   |  65 +++++----
 .../AbstractUdfStreamOperatorLifecycleTest.java |  12 +-
 .../WrappingFunctionSnapshotRestoreTest.java    |   2 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   5 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |  13 +-
 .../io/BarrierBufferMassiveRandomTest.java      |   3 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  33 ++---
 .../runtime/io/BarrierTrackerTest.java          |   7 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   8 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  31 +++--
 .../runtime/tasks/SourceStreamTaskTest.java     |   3 +-
 .../StreamTaskCancellationBarrierTest.java      |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  37 ++---
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  29 ++--
 .../util/AbstractStreamOperatorTestHarness.java |  10 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   7 +-
 .../test/checkpointing/SavepointITCase.java     |  51 ++++---
 .../streaming/runtime/StateBackendITCase.java   |   7 +-
 74 files changed, 1173 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 80ae294..72f2f21 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -941,7 +941,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-
 	private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
 		private String key;
 		private String expect;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a0efe78..bd8d4dd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * @param checkpointId  The Id of the checkpoint.
 	 * @param timestamp     The timestamp of the checkpoint.
 	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 * @param checkpointOptions Options for how to perform this checkpoint.
 	 * @return Future to the state handle of the snapshot data.
 	 * @throws Exception
 	 */
@@ -251,7 +253,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public RunnableFuture<KeyGroupsStateHandle> snapshot(
 			final long checkpointId,
 			final long timestamp,
-			final CheckpointStreamFactory streamFactory) throws Exception {
+			final CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		long startTime = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6b09a8a..3fd5d0f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -219,6 +219,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			String targetLocation) throws IOException {
+
+		return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bce8028..90de7a6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -186,7 +187,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
 
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 
@@ -266,7 +267,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
 		task.cancel();
@@ -342,7 +343,8 @@ public class RocksDBAsyncSnapshotTest {
 			StringSerializer.INSTANCE,
 			new ValueStateDescriptor<>("foobar", String.class));
 
-		RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(
+			checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
 
 		try {
 			FutureUtil.runIfNotDoneAndGet(snapshotFuture);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index dc90666..c7b5c20 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -172,7 +173,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testRunningSnapshotAfterBackendClosed() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+			CheckpointOptions.forFullCheckpoint());
 
 		RocksDB spyDB = keyedStateBackend.db;
 
@@ -209,7 +211,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testReleasingSnapshotAfterBackendClosed() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+			CheckpointOptions.forFullCheckpoint());
 
 		RocksDB spyDB = keyedStateBackend.db;
 
@@ -237,7 +240,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testDismissingSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		snapshot.cancel(true);
 		verifyRocksObjectsReleased();
 	}
@@ -245,7 +248,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testDismissingSnapshotNotRunnable() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		snapshot.cancel(true);
 		Thread asyncSnapshotThread = new Thread(snapshot);
 		asyncSnapshotThread.start();
@@ -262,7 +265,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testCompletingSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		Thread asyncSnapshotThread = new Thread(snapshot);
 		asyncSnapshotThread.start();
 		waiter.await(); // wait for snapshot to run
@@ -282,7 +285,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testCancelRunningSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		Thread asyncSnapshotThread = new Thread(snapshot);
 		asyncSnapshotThread.start();
 		waiter.await(); // wait for snapshot to run

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 36649ad..c1c65b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -296,15 +298,42 @@ public class CheckpointCoordinator {
 		checkNotNull(targetDirectory, "Savepoint target directory");
 
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false);
 
-		if (result.isSuccess()) {
-			return result.getPendingCheckpoint().getCompletionFuture();
-		}
-		else {
-			Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
-			return FlinkCompletableFuture.completedExceptionally(cause);
+		// Create the unique savepoint directory
+		final String savepointDirectory = SavepointStore
+			.createSavepointDirectory(targetDirectory, job);
+
+		CheckpointTriggerResult triggerResult = triggerCheckpoint(
+			timestamp,
+			props,
+			savepointDirectory,
+			false);
+
+		Future<CompletedCheckpoint> result;
+
+		if (triggerResult.isSuccess()) {
+			result = triggerResult.getPendingCheckpoint().getCompletionFuture();
+		} else {
+			Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
+			result = FlinkCompletableFuture.completedExceptionally(cause);
 		}
+
+		// Make sure to remove the created base directory on Exceptions
+		result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+			@Override
+			public Void apply(Throwable value) {
+				try {
+					SavepointStore.deleteSavepointDirectory(savepointDirectory);
+				} catch (Throwable t) {
+					LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+						+ " after failed savepoint.", t);
+				}
+
+				return null;
+			}
+		}, executor);
+
+		return result;
 	}
 
 	/**
@@ -517,9 +546,16 @@ public class CheckpointCoordinator {
 				}
 				// end of lock scope
 
+				CheckpointOptions checkpointOptions;
+				if (!props.isSavepoint()) {
+					checkpointOptions = CheckpointOptions.forFullCheckpoint();
+				} else {
+					checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
+				}
+
 				// send the messages to the tasks that trigger their checkpoint
 				for (Execution execution: executions) {
-					execution.triggerCheckpoint(checkpointID, timestamp);
+					execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
 				}
 
 				numUnsuccessfulCheckpointsTriggers.set(0);
@@ -756,7 +792,7 @@ public class CheckpointCoordinator {
 
 			triggerQueuedRequests();
 		}
-		
+
 		// record the time when this was completed, to calculate
 		// the 'min delay between checkpoints'
 		lastCheckpointCompletionNanos = System.nanoTime();
@@ -1030,7 +1066,7 @@ public class CheckpointCoordinator {
 			final ExecutionAttemptID executionAttemptID,
 			final long checkpointId,
 			final StateObject stateObject) {
-		
+
 		if (stateObject != null) {
 			executor.execute(new Runnable() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
new file mode 100644
index 0000000..cb98d10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+/**
+ * Options for performing the checkpoint.
+ *
+ * <p>The {@link CheckpointProperties} are related and cover properties that
+ * are only relevant at the {@link CheckpointCoordinator}. These options are
+ * relevant at the {@link StatefulTask} instances running on task managers.
+ */
+public class CheckpointOptions implements Serializable {
+
+	private static final long serialVersionUID = 5010126558083292915L;
+
+	/** Type of the checkpoint. */
+	@Nonnull
+	private final CheckpointType checkpointType;
+
+	/** Target location for the checkpoint. */
+	@Nullable
+	private final String targetLocation;
+
+	private CheckpointOptions(
+			@Nonnull CheckpointType checkpointType,
+			String targetLocation) {
+		this.checkpointType = checkNotNull(checkpointType);
+		this.targetLocation = targetLocation;
+	}
+
+	/**
+	 * Returns the type of checkpoint to perform.
+	 *
+	 * @return Type of checkpoint to perform.
+	 */
+	@Nonnull
+	public CheckpointType getCheckpointType() {
+		return checkpointType;
+	}
+
+	/**
+	 * Returns a custom target location or <code>null</code> if none
+	 * was specified.
+	 *
+	 * @return A custom target location or <code>null</code>.
+	 */
+	@Nullable
+	public String getTargetLocation() {
+		return targetLocation;
+	}
+
+	@Override
+	public String toString() {
+		return "CheckpointOptions(" + checkpointType + ")";
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final CheckpointOptions FULL_CHECKPOINT = new CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null);
+
+	public static CheckpointOptions forFullCheckpoint() {
+		return FULL_CHECKPOINT;
+	}
+
+	public static CheckpointOptions forSavepoint(String targetDirectory) {
+		checkNotNull(targetDirectory, "targetDirectory");
+		return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 *  The type of checkpoint to perform.
+	 */
+	public enum CheckpointType {
+
+		/** A full checkpoint. */
+		FULL_CHECKPOINT,
+
+		/** A savepoint. */
+		SAVEPOINT;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 52f2a6a..53d888e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -159,7 +159,7 @@ public class CompletedCheckpoint implements Serializable {
 	void discard() throws Exception {
 		try {
 			if (externalPath != null) {
-				SavepointStore.removeSavepoint(externalPath);
+				SavepointStore.removeSavepointFile(externalPath);
 			}
 
 			StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9f66314..908ff7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -214,7 +214,8 @@ public class PendingCheckpoint {
 					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
 					externalPath = SavepointStore.storeSavepoint(
 							targetDirectory,
-							savepoint);
+							savepoint
+					);
 				} catch (IOException e) {
 					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 48cca20..0caf5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,8 +18,16 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.flink.core.fs.FSDataInputStream;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -28,14 +36,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
- * A file system based savepoint store.
+ * Utilities for storing and loading savepoint meta data files.
  *
  * <p>Stored savepoints have the following format:
  * <pre>
@@ -52,50 +54,84 @@ public class SavepointStore {
 	/** Magic number for sanity checks against stored savepoints. */
 	public static final int MAGIC_NUMBER = 0x4960672d;
 
-	/** Prefix for savepoint files. */
-	private static final String prefix = "savepoint-";
+	private static final String META_DATA_FILE = "_metadata ";
 
 	/**
-	 * Stores the savepoint.
+	 * Creates a savepoint directory.
 	 *
-	 * @param targetDirectory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 * @param <T>       Savepoint type
-	 * @return Path of stored savepoint
-	 * @throws Exception Failures during store are forwarded
+	 * @param baseDirectory Base target directory for the savepoint
+	 * @param jobId Optional JobID the savepoint belongs to
+	 * @return The created savepoint directory
+	 * @throws IOException FileSystem operation failures are forwarded
 	 */
-	public static <T extends Savepoint> String storeSavepoint(
-			String targetDirectory,
-			T savepoint) throws IOException {
-
-		checkNotNull(targetDirectory, "Target directory");
-		checkNotNull(savepoint, "Savepoint");
+	public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
+		String prefix;
+		if (jobId == null) {
+			prefix = "savepoint-";
+		} else {
+			prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
+		}
 
 		Exception latestException = null;
-		Path path = null;
-		FSDataOutputStream fdos = null;
+		Path savepointDirectory = null;
 
 		FileSystem fs = null;
 
 		// Try to create a FS output stream
 		for (int attempt = 0; attempt < 10; attempt++) {
-			path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+			Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
 
 			if (fs == null) {
 				fs = FileSystem.get(path.toUri());
 			}
 
 			try {
-				fdos = fs.create(path, false);
-				break;
+				if (fs.mkdirs(path)) {
+					savepointDirectory = path;
+					break;
+				}
 			} catch (Exception e) {
 				latestException = e;
 			}
 		}
 
-		if (fdos == null) {
-			throw new IOException("Failed to create file output stream at " + path, latestException);
+		if (savepointDirectory == null) {
+			throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
+		} else {
+			return savepointDirectory.getPath();
 		}
+	}
+
+	/**
+	 * Deletes a savepoint directory.
+	 *
+	 * @param savepointDirectory Recursively deletes the given directory
+	 * @throws IOException FileSystem operation failures are forwarded
+	 */
+	public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
+		Path path = new Path(savepointDirectory);
+		FileSystem fs = FileSystem.get(path.toUri());
+		fs.delete(path, true);
+	}
+
+	/**
+	 * Stores the savepoint metadata file.
+	 *
+	 * @param <T>       Savepoint type
+	 * @param directory Target directory to store savepoint in
+	 * @param savepoint Savepoint to be stored
+	 * @return Path of stored savepoint
+	 * @throws Exception Failures during store are forwarded
+	 */
+	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
+		checkNotNull(directory, "Target directory");
+		checkNotNull(savepoint, "Savepoint");
+
+		Path basePath = new Path(directory);
+		FileSystem fs = FileSystem.get(basePath.toUri());
+
+		Path path = new Path(basePath, META_DATA_FILE);
+		FSDataOutputStream fdos = fs.create(path, false);
 
 		boolean success = false;
 		try (DataOutputStream dos = new DataOutputStream(fdos)) {
@@ -115,20 +151,41 @@ public class SavepointStore {
 			}
 		}
 
-		return path.toString();
+		return basePath.toString();
 	}
 
 	/**
 	 * Loads the savepoint at the specified path.
 	 *
-	 * @param path Path of savepoint to load
+	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
 	 * @return The loaded savepoint
 	 * @throws Exception Failures during load are forwared
 	 */
-	public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
-		Preconditions.checkNotNull(path, "Path");
+	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
+		Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+
+		Path path = new Path(savepointFileOrDirectory);
+
+		LOG.info("Loading savepoint from {}", path);
 
-		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+		FileSystem fs = FileSystem.get(path.toUri());
+
+		FileStatus status = fs.getFileStatus(path);
+
+		// If this is a directory, we need to find the meta data file
+		if (status.isDir()) {
+			Path candidatePath = new Path(path, META_DATA_FILE);
+			if (fs.exists(candidatePath)) {
+				path = candidatePath;
+				LOG.info("Using savepoint file in {}", path);
+			} else {
+				throw new IOException("Cannot find meta data file in directory " + path
+					+ ". Please try to load the savepoint directly from the meta data file "
+					+ "instead of the directory.");
+			}
+		}
+
+		try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
 			int magicNumber = dis.readInt();
 
 			if (magicNumber == MAGIC_NUMBER) {
@@ -152,7 +209,7 @@ public class SavepointStore {
 	 * @param path Path of savepoint to remove
 	 * @throws Exception Failures during disposal are forwarded
 	 */
-	public static void removeSavepoint(String path) throws IOException {
+	public static void removeSavepointFile(String path) throws IOException {
 		Preconditions.checkNotNull(path, "Path");
 
 		try {
@@ -173,14 +230,4 @@ public class SavepointStore {
 		}
 	}
 
-	private static FSDataInputStream createFsInputStream(Path path) throws IOException {
-		FileSystem fs = FileSystem.get(path.toUri());
-
-		if (fs.exists(path)) {
-			return fs.open(path);
-		} else {
-			throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b3fe443..3191d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -675,14 +676,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *
 	 * @param checkpointId of th checkpoint to trigger
 	 * @param timestamp of the checkpoint to trigger
+	 * @param checkpointOptions of the checkpoint to trigger
 	 */
-	public void triggerCheckpoint(long checkpointId, long timestamp) {
+	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
 		} else {
 			LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
 				"no longer running.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 59f56b0..0752897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.runtime.io.network.api;
 
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
@@ -43,12 +48,14 @@ public class CheckpointBarrier extends RuntimeEvent {
 
 	private long id;
 	private long timestamp;
+	private CheckpointOptions checkpointOptions;
 
 	public CheckpointBarrier() {}
 
-	public CheckpointBarrier(long id, long timestamp) {
+	public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
 		this.id = id;
 		this.timestamp = timestamp;
+		this.checkpointOptions = checkNotNull(checkpointOptions);
 	}
 
 	public long getId() {
@@ -59,20 +66,53 @@ public class CheckpointBarrier extends RuntimeEvent {
 		return timestamp;
 	}
 
+	public CheckpointOptions getCheckpointOptions() {
+		return checkpointOptions;
+	}
+
+	// ------------------------------------------------------------------------
+	// Serialization
 	// ------------------------------------------------------------------------
 	
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeLong(id);
 		out.writeLong(timestamp);
+		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+		out.writeInt(checkpointType.ordinal());
+
+		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			return;
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			String targetLocation = checkpointOptions.getTargetLocation();
+			assert(targetLocation != null);
+			out.writeUTF(targetLocation);
+		} else {
+			throw new IOException("Unknown CheckpointType " + checkpointType);
+		}
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
 		id = in.readLong();
 		timestamp = in.readLong();
+
+		int typeOrdinal = in.readInt();
+		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+		CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
+
+		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			checkpointOptions = CheckpointOptions.forFullCheckpoint();
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			String targetLocation = in.readUTF();
+			checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+		} else {
+			throw new IOException("Illegal CheckpointType " + checkpointType);
+		}
 	}
-	
+
+
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 4d9f431..223cbfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import java.nio.charset.Charset;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +37,7 @@ import org.apache.flink.util.InstantiationUtil;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Utility class to serialize and deserialize task events.
@@ -60,10 +64,34 @@ public class EventSerializer {
 		else if (eventClass == CheckpointBarrier.class) {
 			CheckpointBarrier barrier = (CheckpointBarrier) event;
 
-			ByteBuffer buf = ByteBuffer.allocate(20);
-			buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
-			buf.putLong(4, barrier.getId());
-			buf.putLong(12, barrier.getTimestamp());
+			CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
+			CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+			ByteBuffer buf;
+			if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+				buf = ByteBuffer.allocate(24);
+				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+				buf.putLong(4, barrier.getId());
+				buf.putLong(12, barrier.getTimestamp());
+				buf.putInt(20, checkpointType.ordinal());
+			} else if (checkpointType == CheckpointType.SAVEPOINT) {
+				String targetLocation = checkpointOptions.getTargetLocation();
+				assert(targetLocation != null);
+				byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+
+				buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+				buf.putLong(4, barrier.getId());
+				buf.putLong(12, barrier.getTimestamp());
+				buf.putInt(20, checkpointType.ordinal());
+				buf.putInt(24, bytes.length);
+				for (int i = 0; i < bytes.length; i++) {
+					buf.put(28 + i, bytes[i]);
+				}
+			} else {
+				throw new IOException("Unknown checkpoint type: " + checkpointType);
+			}
+
 			return buf;
 		}
 		else if (eventClass == EndOfSuperstepEvent.class) {
@@ -172,7 +200,28 @@ public class EventSerializer {
 			else if (type == CHECKPOINT_BARRIER_EVENT) {
 				long id = buffer.getLong();
 				long timestamp = buffer.getLong();
-				return new CheckpointBarrier(id, timestamp);
+
+				CheckpointOptions checkpointOptions;
+
+				int checkpointTypeOrdinal = buffer.getInt();
+				Preconditions.checkElementIndex(type, CheckpointType.values().length,
+					"Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+				CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
+
+				if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+					checkpointOptions = CheckpointOptions.forFullCheckpoint();
+				} else if (checkpointType == CheckpointType.SAVEPOINT) {
+					int len = buffer.getInt();
+					byte[] bytes = new byte[len];
+					buffer.get(bytes);
+					String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+
+					checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+				} else {
+					throw new IOException("Unknown checkpoint type: " + checkpointType);
+				}
+
+				return new CheckpointBarrier(id, timestamp, checkpointOptions);
 			}
 			else if (type == END_OF_SUPERSTEP_EVENT) {
 				return EndOfSuperstepEvent.INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 87b66ce..0930011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
 /**
@@ -46,21 +47,23 @@ public interface StatefulTask {
 	 * method.
 	 *
 	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointOptions Options for performing this checkpoint
 	 *
 	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
-	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
+	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
 	 * barriers on all input streams.
 	 * 
 	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointOptions Options for performing this checkpoint
 	 * @param checkpointMetrics Metrics about this checkpoint
 	 * 
 	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
 	 */
-	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception;
+	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception;
 
 	/**
 	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index fe4ecfb..2876ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.concurrent.Future;
@@ -196,12 +197,13 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			ExecutionAttemptID executionAttemptID,
 			JobID jobId,
 			long checkpointId,
-			long timestamp) {
+			long timestamp,
+			CheckpointOptions checkpointOptions) {
 
 		Preconditions.checkNotNull(executionAttemptID);
 		Preconditions.checkNotNull(jobId);
 
-		actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp));
+		actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index db0a3bf..09f104f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -160,12 +161,14 @@ public interface TaskManagerGateway {
 	 * @param jobId identifying the job to which the task belongs
 	 * @param checkpointId of the checkpoint to trigger
 	 * @param timestamp of the checkpoint to trigger
+	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	void triggerCheckpoint(
 		ExecutionAttemptID executionAttemptID,
 		JobID jobId,
 		long checkpointId,
-		long timestamp);
+		long timestamp,
+		CheckpointOptions checkpointOptions);
 
 	/**
 	 * Request the task manager log from the task manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index eba97d2..28fef27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -123,7 +124,7 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
-	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 //		taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
index 0528755..3477e13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.messages.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 /**
@@ -33,9 +36,19 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
 	/** The timestamp associated with the checkpoint */
 	private final long timestamp;
 
-	public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+	/** Options for how to perform the checkpoint. */
+	private final CheckpointOptions checkpointOptions;
+
+	public TriggerCheckpoint(
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId,
+			long timestamp,
+			CheckpointOptions checkpointOptions) {
+
 		super(job, taskExecutionId, checkpointId);
 		this.timestamp = timestamp;
+		this.checkpointOptions = checkNotNull(checkpointOptions);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -44,6 +57,10 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
 		return timestamp;
 	}
 
+	public CheckpointOptions getCheckpointOptions() {
+		return checkpointOptions;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 3ed49f1..14f897f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base implementation of KeyedStateBackend. The state can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)}.
  *
  * @param <K> Type of the key by which state is keyed.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index bc4594a..a335e45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -31,6 +32,7 @@ import java.io.IOException;
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
+
 	private static final long serialVersionUID = 4620415814639230247L;
 
 	@Override
@@ -39,6 +41,12 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 			String operatorIdentifier) throws IOException;
 
 	@Override
+	public abstract CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException;
+
+	@Override
 	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index adf0727..8dcf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -154,7 +155,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 	@Override
 	public RunnableFuture<OperatorStateHandle> snapshot(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+			long checkpointId,
+			long timestamp,
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (registeredStates.isEmpty()) {
 			return new DoneFuture<>(null);
@@ -346,4 +350,4 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			return partitionOffsets;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..0d92b46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
 /**
  * Interface for operations that can perform snapshots of their state.
@@ -37,12 +38,14 @@ public interface Snapshotable<S extends StateObject> {
 	 * @param checkpointId  The ID of the checkpoint.
 	 * @param timestamp     The timestamp of the checkpoint.
 	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 * @param checkpointOptions Options for how to perform this checkpoint.
 	 * @return A runnable future that will yield a {@link StateObject}.
 	 */
 	RunnableFuture<S> snapshot(
 			long checkpointId,
 			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception;
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 846df89..7961b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -95,6 +96,27 @@ public interface StateBackend extends java.io.Serializable {
 	 */
 	CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
 
+	/**
+	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+	 * that should end up in a savepoint.
+	 *
+	 * <p>This is only called if the triggered checkpoint is a savepoint. Commonly
+	 * this will return the same factory as for regular checkpoints, but maybe
+	 * slightly adjusted.
+	 *
+	 * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+	 * @param operatorIdentifier An identifier of the operator for which we create streams.
+	 * @param targetLocation An optional custom location for the savepoint stream.
+	 * 
+	 * @return The stream factory for savepoints.
+	 * 
+	 * @throws IOException Failures during stream creation are forwarded.
+	 */
+	CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException;
+
 	// ------------------------------------------------------------------------
 	//  Structure Backends 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 30b1da6..8455d84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -94,18 +94,15 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 				MAX_FILE_STATE_THRESHOLD);
 		}
 		this.fileStateThreshold = fileStateSizeThreshold;
+
 		Path basePath = checkpointDataUri;
+		filesystem = basePath.getFileSystem();
 
-		Path dir = new Path(basePath, jobId.toString());
+		checkpointDirectory = createBasePath(filesystem, basePath, jobId);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Initializing file stream factory to URI {}.", dir);
+			LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory);
 		}
-
-		filesystem = basePath.getFileSystem();
-		filesystem.mkdirs(dir);
-
-		checkpointDirectory = dir;
 	}
 
 	@Override
@@ -115,7 +112,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
 		checkFileSystemInitialized();
 
-		Path checkpointDir = createCheckpointDirPath(checkpointID);
+		Path checkpointDir = createCheckpointDirPath(checkpointDirectory, checkpointID);
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
 		return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
 	}
@@ -130,7 +127,13 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		}
 	}
 
-	private Path createCheckpointDirPath(long checkpointID) {
+	protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+		Path dir = new Path(checkpointDirectory, jobID.toString());
+		fs.mkdirs(dir);
+		return dir;
+	}
+
+	protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
 		return new Path(checkpointDirectory, "chk-" + checkpointID);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
new file mode 100644
index 0000000..7410d2d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import java.io.IOException;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+
+/**
+ * A {@link CheckpointStreamFactory} that produces streams that write to a
+ * {@link FileSystem}.
+ *
+ * <p>The difference to the parent {@link FsCheckpointStreamFactory} is only
+ * in the created directory layout. All checkpoint files go to the checkpoint
+ * directory.
+ */
+public class FsSavepointStreamFactory extends FsCheckpointStreamFactory {
+
+	public FsSavepointStreamFactory(
+			Path checkpointDataUri,
+			JobID jobId,
+			int fileStateSizeThreshold) throws IOException {
+
+		super(checkpointDataUri, jobId, fileStateSizeThreshold);
+	}
+
+	@Override
+	protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+		// No checkpoint specific directory required as the savepoint directory
+		// is already unique.
+		return checkpointDirectory;
+	}
+
+	@Override
+	protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
+		// No checkpoint specific directory required as the savepoint directory
+		// is already unique.
+		return checkpointDirectory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 281dbb0..b614d98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -173,6 +173,15 @@ public class FsStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			String targetLocation) throws IOException {
+
+		return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold);
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 04e4fbc..4a5455a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
 import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -215,7 +216,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public RunnableFuture<KeyGroupsStateHandle> snapshot(
 			long checkpointId,
 			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception {
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (stateTables.isEmpty()) {
 			return new DoneFuture<>(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 58a86df..2cc1164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -75,6 +75,15 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			String targetLocation) throws IOException {
+
+		return new MemCheckpointStreamFactory(maxStateSize);
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env, JobID jobID,
 			String operatorIdentifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 2980376..8db1d5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -475,13 +476,13 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ----------------------------------------------------------------------
 
 	@RpcMethod
-	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException {
 		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
-			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
+			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
 
 			return Acknowledge.get();
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ebd4c0c..36a3255 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
@@ -97,9 +98,10 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param executionAttemptID identifying the task
 	 * @param checkpointID unique id for the checkpoint
 	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+	 * @param checkpointOptions for performing the checkpoint
 	 * @return Future acknowledge if the checkpoint has been successfully triggered
 	 */
-	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp);
+	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
 
 	/**
 	 * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index acb423b..c9f17b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -1117,8 +1118,13 @@ public class Task implements Runnable, TaskActions {
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
+	 * @param checkpointOptions Options for performing this checkpoint.
 	 */
-	public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) {
+	public void triggerCheckpointBarrier(
+			final long checkpointID,
+			long checkpointTimestamp,
+			final CheckpointOptions checkpointOptions) {
+
 		final AbstractInvokable invokable = this.invokable;
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
 
@@ -1134,7 +1140,7 @@ public class Task implements Runnable, TaskActions {
 						// activate safety net for checkpointing thread
 						FileSystemSafetyNet.initializeSafetyNetForThread();
 						try {
-							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
+							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
 							if (!success) {
 								checkpointResponder.declineCheckpoint(
 										getJobID(), getExecutionId(), checkpointID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8b08181..21749cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -837,7 +837,7 @@ class JobManager(
           savepoint.dispose()
 
           // Remove the header file
-          SavepointStore.removeSavepoint(savepointPath)
+          SavepointStore.removeSavepointFile(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a70454b..25d5366 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -501,12 +501,13 @@ class TaskManager(
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
+        val checkpointOptions = message.getCheckpointOptions
 
         log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
-          task.triggerCheckpointBarrier(checkpointId, timestamp)
+          task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
         } else {
           log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
         }


[6/7] flink git commit: [FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.

Posted by se...@apache.org.
[FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ffe75a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ffe75a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ffe75a5

Branch: refs/heads/master
Commit: 8ffe75a54f24cbd8e69c455b42a4e969b943a279
Parents: df16e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 15:04:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointOptions.java   |  2 +-
 .../io/network/api/CheckpointBarrier.java       | 66 +++++++-------------
 .../io/network/api/CheckpointBarrierTest.java   | 40 ++++++------
 .../api/serialization/EventSerializerTest.java  | 13 ++--
 4 files changed, 46 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index cb98d10..676cf3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -46,7 +46,7 @@ public class CheckpointOptions implements Serializable {
 
 	private CheckpointOptions(
 			@Nonnull CheckpointType checkpointType,
-			String targetLocation) {
+			@Nullable  String targetLocation) {
 		this.checkpointType = checkNotNull(checkpointType);
 		this.targetLocation = targetLocation;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index a42c25d..97ad90f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
-import org.apache.flink.util.StringUtils;
 
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -48,11 +44,9 @@ import org.apache.flink.util.StringUtils;
  */
 public class CheckpointBarrier extends RuntimeEvent {
 
-	private long id;
-	private long timestamp;
-	private CheckpointOptions checkpointOptions;
-
-	public CheckpointBarrier() {}
+	private final long id;
+	private final long timestamp;
+	private final CheckpointOptions checkpointOptions;
 
 	public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
 		this.id = id;
@@ -75,66 +69,48 @@ public class CheckpointBarrier extends RuntimeEvent {
 	// ------------------------------------------------------------------------
 	// Serialization
 	// ------------------------------------------------------------------------
-	
+
+	//
+	//  These methods are inherited form the generic serialization of AbstractEvent
+	//  but would require the CheckpointBarrier to be mutable. Since all serialization
+	//  for events goes through the EventSerializer class, which has special serialization
+	//  for the CheckpointBarrier, we don't need these methods
+	// 
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id);
-		out.writeLong(timestamp);
-		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-
-		out.writeInt(checkpointType.ordinal());
-
-		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-			return;
-		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			String targetLocation = checkpointOptions.getTargetLocation();
-			checkState(targetLocation != null);
-			StringUtils.writeString(targetLocation, out);
-		} else {
-			throw new IOException("Unknown CheckpointType " + checkpointType);
-		}
+		throw new UnsupportedOperationException("This method should never be called");
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
-		id = in.readLong();
-		timestamp = in.readLong();
-
-		int typeOrdinal = in.readInt();
-		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
-		CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
-
-		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-			checkpointOptions = CheckpointOptions.forFullCheckpoint();
-		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			String targetLocation = StringUtils.readString(in);
-			checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
-		} else {
-			throw new IOException("Illegal CheckpointType " + checkpointType);
-		}
+		throw new UnsupportedOperationException("This method should never be called");
 	}
 
-
 	// ------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {
-		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+		return (int) (id ^ (id >>> 32) ^ timestamp ^ (timestamp >>> 32));
 	}
 
 	@Override
 	public boolean equals(Object other) {
-		if (other == null || !(other instanceof CheckpointBarrier)) {
+		if (other == this) {
+			return true;
+		}
+		else if (other == null || other.getClass() != CheckpointBarrier.class) {
 			return false;
 		}
 		else {
 			CheckpointBarrier that = (CheckpointBarrier) other;
-			return that.id == this.id && that.timestamp == this.timestamp;
+			return that.id == this.id && that.timestamp == this.timestamp &&
+					this.checkpointOptions.equals(that.checkpointOptions);
 		}
 	}
 
 	@Override
 	public String toString() {
-		return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+		return String.format("CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointOptions);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index ad9fc16..ba833c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -18,44 +18,40 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
+
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
+
 public class CheckpointBarrierTest {
 
 	/**
 	 * Test serialization of the checkpoint barrier.
+	 * The checkpoint barrier does not support its own serialization, in order to be immutable.
 	 */
 	@Test
 	public void testSerialization() throws Exception {
 		long id = Integer.MAX_VALUE + 123123L;
 		long timestamp = Integer.MAX_VALUE + 1228L;
 
-		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
-		testSerialization(id, timestamp, checkpoint);
-
-		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
-		testSerialization(id, timestamp, savepoint);
-	}
-
-	private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+		CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
 		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
 
-		DataOutputSerializer out = new DataOutputSerializer(1024);
-		barrier.write(out);
-
-		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
-		CheckpointBarrier deserialized = new CheckpointBarrier();
-		deserialized.read(in);
-
-		assertEquals(id, deserialized.getId());
-		assertEquals(timestamp, deserialized.getTimestamp());
-		assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
-		assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+		try {
+			barrier.write(new DataOutputSerializer(1024));
+			fail("should throw an exception");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
+
+		try {
+			barrier.read(new DataInputDeserializer(new byte[32]));
+			fail("should throw an exception");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index e674eb7..f51b083 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -147,15 +147,14 @@ public class EventSerializerTest {
 	 * 		thinks the encoded buffer matches the class
 	 * @throws IOException
 	 */
-	private final boolean checkIsEvent(final AbstractEvent event,
-		final Class<? extends AbstractEvent> eventClass) throws
-		IOException {
-		final Buffer serializedEvent =
-			EventSerializer.toBuffer(event);
+	private boolean checkIsEvent(
+			AbstractEvent event, 
+			Class<? extends AbstractEvent> eventClass) throws IOException {
+
+		final Buffer serializedEvent = EventSerializer.toBuffer(event);
 		try {
 			final ClassLoader cl = getClass().getClassLoader();
-			return EventSerializer
-				.isEvent(serializedEvent, eventClass, cl);
+			return EventSerializer.isEvent(serializedEvent, eventClass, cl);
 		} finally {
 			serializedEvent.recycle();
 		}


[3/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 46f228a..e407443 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -155,7 +156,9 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[21], buffer.getNextNonBlocked());
 
 		// no call for a completed checkpoint must have happened
-		verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+			any(CheckpointMetaData.class),
+			any(CheckpointOptions.class),
 			any(CheckpointMetrics.class));
 
 		assertNull(buffer.getNextNonBlocked());
@@ -242,7 +245,8 @@ public class BarrierBufferAlignmentLimitTest {
 		// checkpoint 4 completed - check and validate buffered replay
 		check(sequence[9], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(
+			argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		check(sequence[10], buffer.getNextNonBlocked());
 		check(sequence[15], buffer.getNextNonBlocked());
@@ -254,7 +258,8 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[21], buffer.getNextNonBlocked());
 
 		// only checkpoint 4 was successfully completed, not checkpoint 3
-		verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+			argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		assertNull(buffer.getNextNonBlocked());
 		assertNull(buffer.getNextNonBlocked());
@@ -284,7 +289,7 @@ public class BarrierBufferAlignmentLimitTest {
 	}
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
 	}
 
 	private static void check(BufferOrEvent expected, BufferOrEvent present) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 0cf866a..6e088f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -151,7 +152,7 @@ public class BarrierBufferMassiveRandomTest {
 
 			if (barrierGens[currentChannel].isNextBarrier()) {
 				return new BufferOrEvent(
-						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()),
 							currentChannel);
 			} else {
 				Buffer buffer = bufferPools[currentChannel].requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 869d1fe..d6056d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -566,7 +567,7 @@ public class BarrierBufferTest {
 			// checkpoint done - replay buffered
 			check(sequence[5], buffer.getNextNonBlocked());
 			validateAlignmentTime(startTs, buffer);
-			verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+			verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			check(sequence[9], buffer.getNextNonBlocked());
@@ -1008,14 +1009,14 @@ public class BarrierBufferTest {
 
 		check(sequence[0], buffer.getNextNonBlocked());
 		check(sequence[2], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[6], buffer.getNextNonBlocked());
 		assertEquals(5L, buffer.getCurrentCheckpointId());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[8], buffer.getNextNonBlocked());
@@ -1078,7 +1079,7 @@ public class BarrierBufferTest {
 		check(sequence[2], buffer.getNextNonBlocked());
 		startTs = System.nanoTime();
 		check(sequence[5], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 
 		check(sequence[6], buffer.getNextNonBlocked());
@@ -1097,7 +1098,7 @@ public class BarrierBufferTest {
 		check(sequence[16], buffer.getNextNonBlocked());
 		startTs = System.nanoTime();
 		check(sequence[20], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[21], buffer.getNextNonBlocked());
 
@@ -1114,7 +1115,7 @@ public class BarrierBufferTest {
 		// a simple successful checkpoint
 		startTs = System.nanoTime();
 		check(sequence[32], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[33], buffer.getNextNonBlocked());
 
@@ -1175,7 +1176,7 @@ public class BarrierBufferTest {
 
 		// finished first checkpoint
 		check(sequence[3], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 
 		check(sequence[5], buffer.getNextNonBlocked());
@@ -1198,7 +1199,7 @@ public class BarrierBufferTest {
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// no further checkpoint (abort) notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// all done
@@ -1280,7 +1281,7 @@ public class BarrierBufferTest {
 		// checkpoint done
 		check(sequence[7], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		// queued data
 		check(sequence[10], buffer.getNextNonBlocked());
@@ -1299,7 +1300,7 @@ public class BarrierBufferTest {
 		checkNoTempFilesRemain();
 
 		// check overall notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
@@ -1364,7 +1365,7 @@ public class BarrierBufferTest {
 		// checkpoint finished
 		check(sequence[7], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		check(sequence[11], buffer.getNextNonBlocked());
 
 		// remaining data
@@ -1380,7 +1381,7 @@ public class BarrierBufferTest {
 		checkNoTempFilesRemain();
 
 		// check overall notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
@@ -1389,7 +1390,7 @@ public class BarrierBufferTest {
 	// ------------------------------------------------------------------------
 
 	private static BufferOrEvent createBarrier(long checkpointId, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
 	}
 
 	private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
@@ -1487,12 +1488,12 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			assertTrue("wrong checkpoint id",
 					nextExpectedCheckpointId == -1L || 
 					nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index da322f6..05f7da6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -470,7 +471,7 @@ public class BarrierTrackerTest {
 	// ------------------------------------------------------------------------
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
 	}
 
 	private static BufferOrEvent createCancellationBarrier(long id, int channel) {
@@ -502,12 +503,12 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
 
 			final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5c0f0cf..51294ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -174,6 +175,11 @@ public class BlockingCheckpointsTest {
 		}
 
 		@Override
+		public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 				Environment env, JobID jobID, String operatorIdentifier,
 				TypeSerializer<K> keySerializer, int numberOfKeyGroups,
@@ -276,7 +282,7 @@ public class BlockingCheckpointsTest {
 
 		@Override
 		protected void run() throws Exception {
-			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics());
+			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new CheckpointMetrics());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 69c2c88..e22bf86 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -408,7 +409,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -427,14 +428,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 		// we should not yet see the barrier, only the two elements from non-blocked input
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 
 		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0));
+		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 
@@ -467,7 +468,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -488,15 +489,15 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		// Now give a later barrier to all inputs, this should unblock the first channel,
 		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1));
+		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
 
 		testHarness.waitForInputProcessing();
 
@@ -504,9 +505,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 
 		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 
@@ -557,7 +558,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-		while(!streamTask.triggerCheckpoint(checkpointMetaData));
+		while(!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()));
 
 		// since no state was set, there shouldn't be restore calls
 		assertEquals(0, TestingStreamOperator.numberRestoreCalls);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0773699..1a6fa8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -231,7 +232,7 @@ public class SourceStreamTaskTest {
 			for (int i = 0; i < numCheckpoints; i++) {
 				long currentCheckpointId = checkpointId.getAndIncrement();
 				CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
-				sourceTask.triggerCheckpoint(checkpointMetaData);
+				sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 				Thread.sleep(checkpointInterval);
 			}
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index c2d4aaa..53f77ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -56,7 +57,8 @@ public class StreamTaskCancellationBarrierTest {
 		testHarness.invoke();
 
 		// tell the task to commence a checkpoint
-		boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()));
+		boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()),
+			CheckpointOptions.forFullCheckpoint());
 		assertFalse("task triggered checkpoint though not ready", result);
 
 		// a cancellation barrier should be downstream

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1e74c3e..3d01fdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -305,18 +306,18 @@ public class StreamTaskTest extends TestLogger {
 
 		final Exception testException = new Exception("Test exception");
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException);
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException);
 
 		// mock the returned legacy snapshots
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
-		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
-		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
 
 		// set up the task
 
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
 		try {
-			streamTask.triggerCheckpoint(checkpointMetaData);
+			streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 			fail("Expected test exception here.");
 		} catch (Exception e) {
 			assertEquals(testException, e.getCause());
@@ -380,18 +381,18 @@ public class StreamTaskTest extends TestLogger {
 
 		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3);
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
 
 		// mock the legacy state snapshot
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
-		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
-		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
 
 		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
 
@@ -405,7 +406,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
-		streamTask.triggerCheckpoint(checkpointMetaData);
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
 
@@ -468,7 +469,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -495,7 +496,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
-		streamTask.triggerCheckpoint(checkpointMetaData);
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		acknowledgeCheckpointLatch.await();
 
@@ -584,7 +585,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -613,7 +614,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
-		streamTask.triggerCheckpoint(checkpointMetaData);
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		createSubtask.await();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index c0a1638..d465619 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -225,7 +226,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// This element should be buffered since we received a checkpoint barrier on
 		// this input
@@ -262,16 +263,16 @@ public class TwoInputStreamTaskTest {
 			expectedOutput,
 			testHarness.getOutput());
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 		testHarness.endInput();
 		testHarness.waitForTaskCompletion();
 
 		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0));
+		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.",
@@ -306,7 +307,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -329,15 +330,15 @@ public class TwoInputStreamTaskTest {
 
 		// Now give a later barrier to all inputs, this should unblock the first channel,
 		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1));
+		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
 
 		testHarness.waitForInputProcessing();
 
@@ -347,9 +348,9 @@ public class TwoInputStreamTaskTest {
 
 
 		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 01afec6..07424f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
@@ -478,11 +479,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Calls {@link StreamOperator#snapshotState(long, long)}.
+	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}.
 	 */
 	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
 
-		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp);
+		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op");
+
+		OperatorSnapshotResult operatorStateResult = operator.snapshotState(
+			checkpointId,
+			timestamp,
+			CheckpointOptions.forFullCheckpoint());
 
 		KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
 		KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index cde5780..effb44c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -143,9 +144,11 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 
 		if (keyedStateBackend != null) {
-			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
+			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
+					checkpointId,
 					timestamp,
-					streamFactory);
+					streamFactory,
+					CheckpointOptions.forFullCheckpoint());
 			if(!keyedSnapshotRunnable.isDone()) {
 				Thread runner = new Thread(keyedSnapshotRunnable);
 				runner.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 128522b..ac37009 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import java.io.FileNotFoundException;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -160,21 +161,21 @@ public class SavepointITCase extends TestLogger {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 
 			final File checkpointDir = new File(tmpDir, "checkpoints");
-			final File savepointDir = new File(tmpDir, "savepoints");
+			final File savepointRootDir = new File(tmpDir, "savepoints");
 
-			if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+			if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
 				fail("Test setup failed: failed to create temporary directories.");
 			}
 
 			LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
-			LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
+			LOG.info("Created temporary savepoint directory: " + savepointRootDir + ".");
 
 			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
 			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-				savepointDir.toURI().toString());
+				savepointRootDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
 
@@ -217,14 +218,6 @@ public class SavepointITCase extends TestLogger {
 				.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
 			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
 
-			// Only one savepoint should exist
-			File[] files = savepointDir.listFiles();
-			if (files != null) {
-				assertEquals("Savepoint not created in expected directory", 1, files.length);
-			} else {
-				fail("Savepoint not created in expected directory");
-			}
-
 			// Retrieve the savepoint from the testing job manager
 			LOG.info("Requesting the savepoint.");
 			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
@@ -240,15 +233,33 @@ public class SavepointITCase extends TestLogger {
 
 			// - Verification START -------------------------------------------
 
+			// Only one savepoint should exist
+			File[] files = savepointRootDir.listFiles();
+
+			if (files != null) {
+				assertEquals("Savepoint not created in expected directory", 1, files.length);
+				assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+
+				File savepointDir = files[0];
+				File[] savepointFiles = savepointDir.listFiles();
+				assertNotNull(savepointFiles);
+				assertTrue("Did not write savepoint files to directory",savepointFiles.length > 1);
+			} else {
+				fail("Savepoint not created in expected directory");
+			}
+
 			// Only one checkpoint of the savepoint should exist
 			// We currently have the following directory layout: checkpointDir/jobId/chk-ID
-			files = checkpointDir.listFiles();
-			assertNotNull("Checkpoint directory empty", files);
-			assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
-			assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
+			File jobCheckpoints = new File(checkpointDir, jobId.toString());
+
+			if (jobCheckpoints.exists()) {
+				files = jobCheckpoints.listFiles();
+				assertNotNull("Checkpoint directory empty", files);
+				assertEquals("Checkpoints directory not cleaned up: " + Arrays.toString(files), 0, files.length);
+			}
 
 			// Only one savepoint should exist
-			files = savepointDir.listFiles();
+			files = savepointRootDir.listFiles();
 			assertNotNull("Savepoint directory empty", files);
 			assertEquals("No savepoint found in savepoint directory", 1, files.length);
 
@@ -399,8 +410,8 @@ public class SavepointITCase extends TestLogger {
 
 			// All savepoints should have been cleaned up
 			errMsg = "Savepoints directory not cleaned up properly: " +
-				Arrays.toString(savepointDir.listFiles()) + ".";
-			assertEquals(errMsg, 0, savepointDir.listFiles().length);
+				Arrays.toString(savepointRootDir.listFiles()) + ".";
+			assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
 
 			// - Verification END ---------------------------------------------
 		} finally {
@@ -468,7 +479,7 @@ public class SavepointITCase extends TestLogger {
 				flink.submitJobAndWait(jobGraph, false);
 			} catch (Exception e) {
 				assertEquals(JobExecutionException.class, e.getClass());
-				assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+				assertEquals(FileNotFoundException.class, e.getCause().getClass());
 			}
 		} finally {
 			if (flink != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index ec6a8f5..79665dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -86,7 +86,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-
 	public static class FailingStateBackend extends AbstractStateBackend {
 		private static final long serialVersionUID = 1L;
 
@@ -97,6 +96,12 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
+		public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId,
+			String operatorIdentifier, String targetLocation) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 				Environment env,
 				JobID jobID,


[4/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c2ada3b..d8e46fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -313,8 +313,8 @@ public class CheckpointCoordinatorTest {
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// check that the vertices received the trigger checkpoint message
-			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
-			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
+			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 
 			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
@@ -428,14 +428,14 @@ public class CheckpointCoordinatorTest {
 
 			// check that the vertices received the trigger checkpoint message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
 			}
 
 			// check that the vertices received the trigger checkpoint message for the second checkpoint
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -529,8 +529,8 @@ public class CheckpointCoordinatorTest {
 
 			// check that the vertices received the trigger checkpoint message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
 			}
 
 			// acknowledge from one of the tasks
@@ -558,8 +558,8 @@ public class CheckpointCoordinatorTest {
 
 			// validate that the relevant tasks got a confirmation message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
 			}
 
 			CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
@@ -589,8 +589,8 @@ public class CheckpointCoordinatorTest {
 
 			// validate that the relevant tasks got a confirmation message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
 
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
 				verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -660,8 +660,8 @@ public class CheckpointCoordinatorTest {
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
 
 			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
 
@@ -687,8 +687,8 @@ public class CheckpointCoordinatorTest {
 			CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
 
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
@@ -794,8 +794,8 @@ public class CheckpointCoordinatorTest {
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
 
 			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
 
@@ -819,8 +819,8 @@ public class CheckpointCoordinatorTest {
 			long checkpointId2 = pending2.getCheckpointId();
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
 
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
@@ -1142,7 +1142,7 @@ public class CheckpointCoordinatorTest {
 					numCalls.incrementAndGet();
 					return null;
 				}
-			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
+			}).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
@@ -1232,7 +1232,7 @@ public class CheckpointCoordinatorTest {
 				triggerCalls.add((Long) invocation.getArguments()[0]);
 				return null;
 			}
-		}).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong());
+		}).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 		final long delay = 50;
 
@@ -1398,7 +1398,6 @@ public class CheckpointCoordinatorTest {
 		assertFalse(savepointFuture.isDone());
 
 		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
 
@@ -1414,8 +1413,8 @@ public class CheckpointCoordinatorTest {
 
 		// validate that the relevant tasks got a confirmation message
 		{
-			verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
-			verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+			verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+			verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
 
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -1537,7 +1536,7 @@ public class CheckpointCoordinatorTest {
 					numCalls.incrementAndGet();
 					return null;
 				}
-			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
+			}).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			doAnswer(new Answer<Void>() {
 				@Override
@@ -1578,7 +1577,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(maxConcurrentAttempts, numCalls.get());
 
 			verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
-					.triggerCheckpoint(anyLong(), anyLong());
+					.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			// now, once we acknowledge one checkpoint, it should trigger the next one
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
new file mode 100644
index 0000000..6788338
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.junit.Test;
+
+public class CheckpointOptionsTest {
+
+	@Test
+	public void testFullCheckpoint() throws Exception {
+		CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
+		assertEquals(CheckpointType.FULL_CHECKPOINT, options.getCheckpointType());
+		assertNull(options.getTargetLocation());
+	}
+
+	@Test
+	public void testSavepoint() throws Exception {
+		String location = "asdasdadasdasdja7931481398123123123kjhasdkajsd";
+		CheckpointOptions options = CheckpointOptions.forSavepoint(location);
+		assertEquals(CheckpointType.SAVEPOINT, options.getCheckpointType());
+		assertEquals(location, options.getTargetLocation());
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void testSavepointNullCheck() throws Exception {
+		CheckpointOptions.forSavepoint(null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 3c373f1..95a31d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -184,6 +184,7 @@ public class CheckpointStatsHistoryTest {
 		when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
 		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		when(completed.getCheckpointId()).thenReturn(checkpointId);
+		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		return completed;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 512768d..6ab8620 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -183,7 +183,7 @@ public class MigrationV0ToV1Test {
 
 		} finally {
 			// Dispose
-			SavepointStore.removeSavepoint(path.toString());
+			SavepointStore.removeSavepointFile(path.toString());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6471d6f..c66b29d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -64,12 +64,12 @@ public class SavepointLoaderTest {
 		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
 		taskStates.put(vertexId, state);
 
+		JobID jobId = new JobID();
+
 		// Store savepoint
 		SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values());
 		String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
-		JobID jobId = new JobID();
-
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(parallelism);
 		when(vertex.getMaxParallelism()).thenReturn(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 3398341..dc19e47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import java.io.File;
+import java.util.Arrays;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -38,6 +41,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -54,14 +58,22 @@ public class SavepointStoreTest {
 	 */
 	@Test
 	public void testStoreLoadDispose() throws Exception {
-		String target = tmp.getRoot().getAbsolutePath();
+		String root = tmp.getRoot().getAbsolutePath();
+		File rootFile = new File(root);
 
-		assertEquals(0, tmp.getRoot().listFiles().length);
+		File[] list = rootFile.listFiles();
+
+		assertNotNull(list);
+		assertEquals(0, list.length);
 
 		// Store
+		String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
 		SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
-		String path = SavepointStore.storeSavepoint(target, stored);
-		assertEquals(1, tmp.getRoot().listFiles().length);
+		String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
+
+		list = rootFile.listFiles();
+		assertNotNull(list);
+		assertEquals(1, list.length);
 
 		// Load
 		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
@@ -70,9 +82,11 @@ public class SavepointStoreTest {
 		loaded.dispose();
 
 		// Dispose
-		SavepointStore.removeSavepoint(path);
+		SavepointStore.deleteSavepointDirectory(path);
 
-		assertEquals(0, tmp.getRoot().listFiles().length);
+		list = rootFile.listFiles();
+		assertNotNull(list);
+		assertEquals(0, list.length);
 	}
 
 	/**
@@ -108,8 +122,8 @@ public class SavepointStoreTest {
 
 		assertTrue(serializers.size() >= 1);
 
-		String target = tmp.getRoot().getAbsolutePath();
-		assertEquals(0, tmp.getRoot().listFiles().length);
+		String root = tmp.getRoot().getAbsolutePath();
+		File rootFile = new File(root);
 
 		// New savepoint type for test
 		int version = ThreadLocalRandom.current().nextInt();
@@ -118,14 +132,24 @@ public class SavepointStoreTest {
 		// Add serializer
 		serializers.put(version, NewSavepointSerializer.INSTANCE);
 
+		String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID());
 		TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
-		String pathNewSavepoint = SavepointStore.storeSavepoint(target, newSavepoint);
-		assertEquals(1, tmp.getRoot().listFiles().length);
+		String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
+
+		File[] list = rootFile.listFiles();
+
+		assertNotNull(list);
+		assertEquals(1, list.length);
 
 		// Savepoint v0
+		String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
 		Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
-		String pathSavepoint = SavepointStore.storeSavepoint(target, savepoint);
-		assertEquals(2, tmp.getRoot().listFiles().length);
+		String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
+
+		list = rootFile.listFiles();
+
+		assertNotNull(list);
+		assertEquals(2, list.length);
 
 		// Load
 		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
new file mode 100644
index 0000000..dd5b0b6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class CheckpointBarrierTest {
+
+	/**
+	 * Test serialization of the checkpoint barrier.
+	 */
+	@Test
+	public void testSerialization() throws Exception {
+		long id = Integer.MAX_VALUE + 123123L;
+		long timestamp = Integer.MAX_VALUE + 1228L;
+
+		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+		testSerialization(id, timestamp, checkpoint);
+
+		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+		testSerialization(id, timestamp, savepoint);
+	}
+
+	private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+
+		DataOutputSerializer out = new DataOutputSerializer(1024);
+		barrier.write(out);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+		CheckpointBarrier deserialized = new CheckpointBarrier();
+		deserialized.read(in);
+
+		assertEquals(id, deserialized.getId());
+		assertEquals(timestamp, deserialized.getTimestamp());
+		assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+		assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 271d0d2..e674eb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -25,25 +33,42 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-
 import org.junit.Test;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
+public class EventSerializerTest {
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+	@Test
+	public void testCheckpointBarrierSerialization() throws Exception {
+		long id = Integer.MAX_VALUE + 123123L;
+		long timestamp = Integer.MAX_VALUE + 1228L;
 
-public class EventSerializerTest {
+		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+		testCheckpointBarrierSerialization(id, timestamp, checkpoint);
+
+		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+		testCheckpointBarrierSerialization(id, timestamp, savepoint);
+	}
+
+	private void testCheckpointBarrierSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+		ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+		ByteBuffer serialized = EventSerializer.toSerializedEvent(barrier);
+		CheckpointBarrier deserialized = (CheckpointBarrier) EventSerializer.fromSerializedEvent(serialized, cl);
+		assertFalse(serialized.hasRemaining());
+
+		assertEquals(id, deserialized.getId());
+		assertEquals(timestamp, deserialized.getTimestamp());
+		assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+		assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+	}
 
 	@Test
 	public void testSerializeDeserializeEvent() throws Exception {
 		AbstractEvent[] events = {
 				EndOfPartitionEvent.INSTANCE,
 				EndOfSuperstepEvent.INSTANCE,
-				new CheckpointBarrier(1678L, 4623784L),
+				new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
 				new TestTaskEvent(Math.random(), 12361231273L),
 				new CancelCheckpointMarker(287087987329842L)
 		};
@@ -94,7 +119,7 @@ public class EventSerializerTest {
 		AbstractEvent[] events = {
 			EndOfPartitionEvent.INSTANCE,
 			EndOfSuperstepEvent.INSTANCE,
-			new CheckpointBarrier(1678L, 4623784L),
+			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
 			new TestTaskEvent(Math.random(), 12361231273L),
 			new CancelCheckpointMarker(287087987329842L)
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63175ed..900b5c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -327,7 +328,7 @@ public class RecordWriterTest {
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L);
+		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forFullCheckpoint());
 
 		// No records emitted yet, broadcast should not request a buffer
 		writer.broadcastEvent(barrier);
@@ -363,7 +364,7 @@ public class RecordWriterTest {
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L);
+		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forFullCheckpoint());
 
 		// Emit records on some channels first (requesting buffers), then
 		// broadcast the event. The record buffers should be emitted first, then

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index de54d1f..5a38be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -601,7 +602,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 			ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare(
 					String.valueOf(UUID.randomUUID()),
 					InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
@@ -619,7 +620,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index db45231..bc420cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -49,7 +50,7 @@ public class CheckpointMessagesTest {
 			NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
 			testSerializabilityEqualsHashCode(cc);
 
-			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forFullCheckpoint());
 			testSerializabilityEqualsHashCode(tc);
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5bd085f..94df524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
@@ -165,7 +166,7 @@ public class OperatorStateBackendTest {
 		listState3.add(20);
 
 		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
-		OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory).get();
+		OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get();
 
 		try {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b0350d..f2416b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -191,7 +192,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		// draw a snapshot
-		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -202,7 +203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("u3");
 
 		// draw another snapshot
-		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -403,7 +404,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals(13, (int) state2.value());
 
 		// draw a snapshot
-		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		backend.dispose();
 		backend = restoreKeyedBackend(
@@ -476,7 +477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals(42L, (long) state.value());
 
 		// draw a snapshot
-		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		backend.dispose();
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
@@ -521,7 +522,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -532,7 +533,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("u3");
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -620,7 +621,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -631,7 +632,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("u3");
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -722,7 +723,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -734,7 +735,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add(103);
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -829,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -841,7 +842,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -1163,7 +1164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("ShouldBeInSecondHalf");
 
 
-		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory));
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles(
 				Collections.singletonList(snapshot),
@@ -1230,7 +1231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.update("2");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1281,7 +1282,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("2");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1334,7 +1335,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("2");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1385,7 +1386,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.put("2", "Second");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
-		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		backend.dispose();
 
@@ -1692,7 +1693,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
+			KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
 			assertNull(snapshot);
 			backend.dispose();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
new file mode 100644
index 0000000..a29d29c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FsSavepointStreamFactoryTest {
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	/**
+	 * Tests that the factory creates all files in the given directory without
+	 * creating any sub directories.
+	 */
+	@Test
+	public void testSavepointStreamDirectoryLayout() throws Exception {
+		File testRoot = folder.newFolder();
+		JobID jobId = new JobID();
+
+		FsSavepointStreamFactory savepointStreamFactory = new FsSavepointStreamFactory(
+				new Path(testRoot.getAbsolutePath()),
+				jobId,
+				0);
+
+		File[] listed = testRoot.listFiles();
+		assertNotNull(listed);
+		assertEquals(0, listed.length);
+
+		FsCheckpointStateOutputStream stream = savepointStreamFactory
+			.createCheckpointStateOutputStream(1273, 19231);
+
+		stream.write(1);
+
+		FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+		listed = testRoot.listFiles();
+		assertNotNull(listed);
+		assertEquals(1, listed.length);
+		assertEquals(handle.getFilePath().getPath(), listed[0].getPath());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 187163d..89ae5da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -91,7 +92,7 @@ public class TaskAsyncCallTest {
 			awaitLatch.await();
 			
 			for (int i = 1; i <= NUM_CALLS; i++) {
-				task.triggerCheckpointBarrier(i, 156865867234L);
+				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
 			}
 			
 			triggerLatch.await();
@@ -121,7 +122,7 @@ public class TaskAsyncCallTest {
 			awaitLatch.await();
 
 			for (int i = 1; i <= NUM_CALLS; i++) {
-				task.triggerCheckpointBarrier(i, 156865867234L);
+				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
 				task.notifyCheckpointComplete(i);
 			}
 
@@ -226,7 +227,7 @@ public class TaskAsyncCallTest {
 		public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
 			lastCheckpointId++;
 			if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
 				if (lastCheckpointId == NUM_CALLS) {
@@ -243,7 +244,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 144247f..05fda28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.IOException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
@@ -36,6 +37,8 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -340,17 +343,19 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
-	public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception {
+	public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 
 		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 
 		OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
 
+		CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
+
 		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
 				checkpointId,
 				timestamp,
-				checkpointStreamFactory,
+				factory,
 				keyGroupRange,
 				getContainingTask().getCancelables())) {
 
@@ -361,12 +366,12 @@ public abstract class AbstractStreamOperator<OUT>
 
 			if (null != operatorStateBackend) {
 				snapshotInProgress.setOperatorStateManagedFuture(
-					operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+					operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
 			}
 
 			if (null != keyedStateBackend) {
 				snapshotInProgress.setKeyedStateManagedFuture(
-					keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+					keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
 			}
 		} catch (Exception snapshotException) {
 			try {
@@ -431,11 +436,12 @@ public abstract class AbstractStreamOperator<OUT>
 	@SuppressWarnings("deprecation")
 	@Deprecated
 	@Override
-	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 		if (this instanceof StreamCheckpointedOperator) {
+			CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
 
 			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-					checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+				factory.createCheckpointStateOutputStream(checkpointId, timestamp);
 
 			getContainingTask().getCancelables().registerClosable(outStream);
 
@@ -495,6 +501,31 @@ public abstract class AbstractStreamOperator<OUT>
 	@Override
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {}
 
+	/**
+	 * Returns a checkpoint stream factory for the provided options.
+	 *
+	 * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared
+	 * factory of this operator.
+	 *
+	 * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory per
+	 * savepoint.
+	 *
+	 * @param checkpointOptions Options for the checkpoint
+	 * @return Checkpoint stream factory for the checkpoints
+	 * @throws IOException Failures while creating a new stream factory are forwarded
+	 */
+	@VisibleForTesting
+	CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException {
+		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			return checkpointStreamFactory;
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation());
+		} else {
+			throw new IllegalStateException("Unknown checkpoint type " + checkpointType);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Properties and Services
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 5a6c37b..83697ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.ExceptionUtils;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * Result of {@link AbstractStreamOperator#snapshotState}.
+ * Result of {@link StreamOperator#snapshotState}.
  */
 public class OperatorSnapshotResult {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d8e4d08..006e910 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -98,7 +99,10 @@ public interface StreamOperator<OUT> extends Serializable {
 	 * 
 	 * @throws Exception exception that happened during snapshotting.
 	 */
-	OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception;
+	OperatorSnapshotResult snapshotState(
+		long checkpointId,
+		long timestamp,
+		CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}.
@@ -110,7 +114,10 @@ public interface StreamOperator<OUT> extends Serializable {
 	 */
 	@SuppressWarnings("deprecation")
 	@Deprecated
-	StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception;
+	StreamStateHandle snapshotLegacyOperatorState(
+		long checkpointId,
+		long timestamp,
+		CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Provides state handles to restore the operator state.
@@ -142,4 +149,5 @@ public interface StreamOperator<OUT> extends Serializable {
 	void setChainingStrategy(ChainingStrategy strategy);
 
 	MetricGroup getMetricGroup();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 611bd44..2da8389 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -368,7 +368,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					.setBytesBufferedInAlignment(bytesBuffered)
 					.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+				checkpointMetaData,
+				checkpointBarrier.getCheckpointOptions(),
+				checkpointMetrics);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 77608c6..8b1b65b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -132,7 +133,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
+			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
 			return;
 		}
 
@@ -170,7 +171,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 						LOG.debug("Received all barriers for checkpoint {}", barrierId);
 					}
 
-					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
 				}
 			}
 		}
@@ -248,14 +249,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+	private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
 			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
 				.setBytesBufferedInAlignment(0L)
 				.setAlignmentDurationNanos(0L);
 
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4f07182..dd93592 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -164,9 +165,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 	}
 
-	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+	public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
 		try {
-			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
 			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
 				streamOutput.broadcastEvent(barrier);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 62cfb8f..938ffd2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -163,7 +164,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private TaskStateHandles restoreStateHandles;
 
-
 	/** The currently active background materialization threads */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
 
@@ -520,14 +520,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 		try {
 			// No alignment if we inject a checkpoint
 			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
 					.setBytesBufferedInAlignment(0L)
 					.setAlignmentDurationNanos(0L);
 
-			return performCheckpoint(checkpointMetaData, checkpointMetrics);
+			return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -543,9 +543,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+	public void triggerCheckpointOnBarrier(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+
 		try {
-			performCheckpoint(checkpointMetaData, checkpointMetrics);
+			performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 		catch (CancelTaskException e) {
 			throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
@@ -570,8 +574,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
-		LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName());
+	private boolean performCheckpoint(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+
+		LOG.debug("Starting checkpoint ({}) {} on task {}",
+			checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
 
 		synchronized (lock) {
 			if (isRunning) {
@@ -582,9 +591,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				// Given this, we immediately emit the checkpoint barriers, so the downstream operators
 				// can start their checkpoint work as soon as possible
 				operatorChain.broadcastCheckpointBarrier(
-						checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
+						checkpointMetaData.getCheckpointId(),
+						checkpointMetaData.getTimestamp(),
+						checkpointOptions);
 
-				checkpointState(checkpointMetaData, checkpointMetrics);
+				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
 				return true;
 			}
 			else {
@@ -637,8 +648,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
-		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
+	private void checkpointState(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+
+		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
+			this,
+			checkpointMetaData,
+			checkpointOptions,
+			checkpointMetrics);
+
 		checkpointingOperation.executeCheckpointing();
 	}
 
@@ -814,7 +834,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return stateBackend.createStreamFactory(
 				getEnvironment().getJobID(),
 				createOperatorIdentifier(operator, configuration.getVertexID()));
+	}
 
+	public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
+		return stateBackend.createSavepointStreamFactory(
+			getEnvironment().getJobID(),
+			createOperatorIdentifier(operator, configuration.getVertexID()),
+			targetLocation);
 	}
 
 	private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
@@ -1048,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private final StreamTask<?, ?> owner;
 
 		private final CheckpointMetaData checkpointMetaData;
+		private final CheckpointOptions checkpointOptions;
 		private final CheckpointMetrics checkpointMetrics;
 
 		private final StreamOperator<?>[] allOperators;
@@ -1060,9 +1087,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private final List<StreamStateHandle> nonPartitionedStates;
 		private final List<OperatorSnapshotResult> snapshotInProgressList;
 
-		public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
+		public CheckpointingOperation(
+				StreamTask<?, ?> owner,
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+
 			this.owner = Preconditions.checkNotNull(owner);
 			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+			this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
 			this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
 			this.allOperators = owner.operatorChain.getAllOperators();
 			this.nonPartitionedStates = new ArrayList<>(allOperators.length);
@@ -1137,14 +1170,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		@SuppressWarnings("deprecation")
 		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 			if (null != op) {
-				// first call the legacy checkpoint code paths 
+				// first call the legacy checkpoint code paths
 				nonPartitionedStates.add(op.snapshotLegacyOperatorState(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getTimestamp()));
+						checkpointMetaData.getTimestamp(),
+						checkpointOptions));
 
 				OperatorSnapshotResult snapshotInProgress = op.snapshotState(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getTimestamp());
+						checkpointMetaData.getTimestamp(),
+						checkpointOptions);
 
 				snapshotInProgressList.add(snapshotInProgress);
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 6751617..51b9d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -100,4 +100,4 @@ public class ListCheckpointedTest {
 			return restored;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 274611a..8507200 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -17,12 +17,34 @@
  */
 package org.apache.flink.streaming.api.operators;
 
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -45,27 +67,6 @@ import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RunnableFuture;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
 /**
  * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
  * tests timers and state and whether they are correctly checkpointed/restored
@@ -495,10 +496,10 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
-		operator.snapshotState(checkpointId, timestamp);
+		operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 
 		verify(context).close();
 	}
@@ -524,14 +525,14 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		// lets fail when calling the actual snapshotState method
 		doThrow(failingException).when(operator).snapshotState(eq(context));
 
 		try {
-			operator.snapshotState(checkpointId, timestamp);
+			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
@@ -571,23 +572,29 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+
+		// The amount of mocking in this test makes it necessary to make the
+		// getCheckpointStreamFactory method visible for the test and to
+		// overwrite its behaviour.
+		when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory);
+
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
 
 		OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
-		when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+		when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
 		AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
-		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException);
 
 		Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
 		Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
 		Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
 
 		try {
-			operator.snapshotState(checkpointId, timestamp);
+			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index c4ddea8..d331171 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -90,8 +91,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
 			"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
 			"org.apache.flink.streaming.api.operators.Output], " +
-			"snapshotLegacyOperatorState[long, long], " +
-			"snapshotState[long, long]]";
+			"snapshotLegacyOperatorState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions], " +
+			"snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
 
 	private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
 			", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -240,7 +241,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 						try {
 							runStarted.await();
 							if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
-									new CheckpointMetaData(0, System.currentTimeMillis()))) {
+									new CheckpointMetaData(0, System.currentTimeMillis()),
+									CheckpointOptions.forFullCheckpoint())) {
 								LifecycleTrackingStreamSource.runFinish.trigger();
 							}
 						} catch (Exception e) {
@@ -260,9 +262,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		}
 
 		@Override
-		public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+		public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 			ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
-			return super.snapshotLegacyOperatorState(checkpointId, timestamp);
+			return super.snapshotLegacyOperatorState(checkpointId, timestamp, checkpointOptions);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index b1689f9..ab4258f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -184,4 +184,4 @@ public class WrappingFunctionSnapshotRestoreTest {
 			return value;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 907f8f1..c4867ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -522,7 +523,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-		task.triggerCheckpoint(checkpointMetaData);
+		task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		env.getCheckpointLatch().await();
 
@@ -557,7 +558,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));
 
 		// trigger the checkpoint while processing stream elements
-		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp));
+		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forFullCheckpoint());
 
 		restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
 


[2/7] flink git commit: [FLINK-5763] [checkpoints] Followup on adding CheckpointOptions

Posted by se...@apache.org.
[FLINK-5763] [checkpoints] Followup on adding CheckpointOptions

  - Add a test that validates the checkpoint type ordinals are not changed
  - Change target location writing from 'writeUtf' to 'StringUtils.write'.
  - Pull out the coding charset as a constant in 'EventSerializer'
  - Simplify the directory creation in 'SavepointStore'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df16e50b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df16e50b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df16e50b

Branch: refs/heads/master
Commit: df16e50bbf01d26f75b7745dacd5779ad47dcce5
Parents: 6e7a917
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 14:11:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../flink/core/io/IOReadableWritable.java       |  7 +-
 .../java/org/apache/flink/util/StringUtils.java | 47 +++++++++++--
 .../checkpoint/savepoint/SavepointStore.java    | 70 ++++++++++----------
 .../io/network/api/CheckpointBarrier.java       | 16 +++--
 .../api/serialization/EventSerializer.java      | 17 ++---
 .../runtime/util/DataInputDeserializer.java     |  6 +-
 .../runtime/checkpoint/CheckpointTypeTest.java  | 42 ++++++++++++
 .../io/network/api/CheckpointBarrierTest.java   |  2 +-
 8 files changed, 145 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index a192a21..a38952e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -27,9 +27,10 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * This interface must be implemented by every class whose objects have to be serialized to their binary representation
  * and vice-versa. In particular, records have to implement this interface in order to specify how their data can be
- * transfered
- * to a binary representation.
- * When implementing this Interface make sure that the implementing class has a default (zero-argument) constructor!
+ * transferred to a binary representation.
+ * 
+ * <p>When implementing this Interface make sure that the implementing class has a default
+ * (zero-argument) constructor!
  */
 @Public
 public interface IOReadableWritable {

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 3c32d77..fc945c6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -27,6 +27,11 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utility class to convert objects into strings in vice-versa.
  */
@@ -302,19 +307,46 @@ public final class StringUtils {
 		}
 		return new String(data);
 	}
-	
+
+	/**
+	 * Writes a String to the given output.
+	 * The written string can be read with {@link #readNullableString(DataInputView)}.
+	 *
+	 * @param str The string to write
+	 * @param out The output to write to
+	 *   
+	 * @throws IOException Thrown, if the writing or the serialization fails.
+	 */
+	public static void writeString(@Nonnull String str, DataOutputView out) throws IOException {
+		checkNotNull(str);
+		StringValue.writeString(str, out);
+	}
+
+	/**
+	 * Reads a non-null String from the given input.
+	 *
+	 * @param in The input to read from
+	 * @return The deserialized String
+	 * 
+	 * @throws IOException Thrown, if the reading or the deserialization fails.
+	 */
+	public static String readString(DataInputView in) throws IOException {
+		return StringValue.readString(in);
+	}
+
 	/**
 	 * Writes a String to the given output. The string may be null.
 	 * The written string can be read with {@link #readNullableString(DataInputView)}-
 	 * 
 	 * @param str The string to write, or null.
 	 * @param out The output to write to.
-	 * @throws IOException Throws if the writing or the serialization fails.
+	 * 
+	 * @throws IOException Thrown, if the writing or the serialization fails.
 	 */
-	public static void writeNullableString(String str, DataOutputView out) throws IOException {
+	public static void writeNullableString(@Nullable String str, DataOutputView out) throws IOException {
 		if (str != null) {
 			out.writeBoolean(true);
-			StringValue.writeString(str, out);
+			writeString(str, out);
 		} else {
 			out.writeBoolean(false);
 		}
@@ -326,11 +358,12 @@ public final class StringUtils {
 	 * 
 	 * @param in The input to read from.
 	 * @return The deserialized string, or null.
-	 * @throws IOException Throws if the reading or the deserialization fails.
+	 * 
+	 * @throws IOException Thrown, if the reading or the deserialization fails.
 	 */
-	public static String readNullableString(DataInputView in) throws IOException {
+	public static @Nullable String readNullableString(DataInputView in) throws IOException {
 		if (in.readBoolean()) {
-			return StringValue.readString(in);
+			return readString(in);
 		} else {
 			return null;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 0caf5b2..95370a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,24 +18,27 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utilities for storing and loading savepoint meta data files.
  *
@@ -65,7 +68,10 @@ public class SavepointStore {
 	 * @throws IOException FileSystem operation failures are forwarded
 	 */
 	public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
-		String prefix;
+		final Path basePath = new Path(baseDirectory);
+		final FileSystem fs = basePath.getFileSystem();
+
+		final String prefix;
 		if (jobId == null) {
 			prefix = "savepoint-";
 		} else {
@@ -73,33 +79,21 @@ public class SavepointStore {
 		}
 
 		Exception latestException = null;
-		Path savepointDirectory = null;
-
-		FileSystem fs = null;
 
 		// Try to create a FS output stream
 		for (int attempt = 0; attempt < 10; attempt++) {
-			Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
-
-			if (fs == null) {
-				fs = FileSystem.get(path.toUri());
-			}
+			Path path = new Path(basePath, FileUtils.getRandomFilename(prefix));
 
 			try {
 				if (fs.mkdirs(path)) {
-					savepointDirectory = path;
-					break;
+					return path.toString();
 				}
 			} catch (Exception e) {
 				latestException = e;
 			}
 		}
 
-		if (savepointDirectory == null) {
-			throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
-		} else {
-			return savepointDirectory.getPath();
-		}
+		throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
 	}
 
 	/**
@@ -121,20 +115,22 @@ public class SavepointStore {
 	 * @param directory Target directory to store savepoint in
 	 * @param savepoint Savepoint to be stored
 	 * @return Path of stored savepoint
-	 * @throws Exception Failures during store are forwarded
+	 * @throws IOException Failures during store are forwarded
 	 */
 	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
 		checkNotNull(directory, "Target directory");
 		checkNotNull(savepoint, "Savepoint");
 
-		Path basePath = new Path(directory);
-		FileSystem fs = FileSystem.get(basePath.toUri());
+		final Path basePath = new Path(directory);
+		final Path metadataFilePath = new Path(basePath, META_DATA_FILE);
 
-		Path path = new Path(basePath, META_DATA_FILE);
-		FSDataOutputStream fdos = fs.create(path, false);
+		final FileSystem fs = FileSystem.get(basePath.toUri());
 
 		boolean success = false;
-		try (DataOutputStream dos = new DataOutputStream(fdos)) {
+		try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); 
+				DataOutputStream dos = new DataOutputStream(fdos))
+		{
+
 			// Write header
 			dos.writeInt(MAGIC_NUMBER);
 			dos.writeInt(savepoint.getVersion());
@@ -143,14 +139,18 @@ public class SavepointStore {
 			SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint);
 			serializer.serialize(savepoint, dos);
 			success = true;
-		} finally {
-			if (!success && fs.exists(path)) {
-				if (!fs.delete(path, true)) {
-					LOG.warn("Failed to delete file {} after failed write.", path);
+		}
+		finally {
+			if (!success && fs.exists(metadataFilePath)) {
+				if (!fs.delete(metadataFilePath, true)) {
+					LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath);
 				}
 			}
 		}
 
+		// we return the savepoint directory path here!
+		// The directory path also works to resume from and is more elegant than the direct
+		// metadata file pointer
 		return basePath.toString();
 	}
 
@@ -159,7 +159,7 @@ public class SavepointStore {
 	 *
 	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
 	 * @return The loaded savepoint
-	 * @throws Exception Failures during load are forwared
+	 * @throws IOException Failures during load are forwarded
 	 */
 	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
 		Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
@@ -207,7 +207,7 @@ public class SavepointStore {
 	 * Removes the savepoint meta data w/o loading and disposing it.
 	 *
 	 * @param path Path of savepoint to remove
-	 * @throws Exception Failures during disposal are forwarded
+	 * @throws IOException Failures during disposal are forwarded
 	 */
 	public static void removeSavepointFile(String path) throws IOException {
 		Preconditions.checkNotNull(path, "Path");

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 0752897..a42c25d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
+import org.apache.flink.util.StringUtils;
 
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
@@ -37,12 +39,12 @@ import org.apache.flink.runtime.event.RuntimeEvent;
  * 
  * <p>Once an operator has received a checkpoint barrier from all its input channels, it
  * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
- * behavior and broadcast the barrier to downstream operators.</p>
+ * behavior and broadcast the barrier to downstream operators.
  * 
  * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
- * is complete (exactly once)</p>
+ * is complete (exactly once).
  * 
- * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.
  */
 public class CheckpointBarrier extends RuntimeEvent {
 
@@ -86,8 +88,8 @@ public class CheckpointBarrier extends RuntimeEvent {
 			return;
 		} else if (checkpointType == CheckpointType.SAVEPOINT) {
 			String targetLocation = checkpointOptions.getTargetLocation();
-			assert(targetLocation != null);
-			out.writeUTF(targetLocation);
+			checkState(targetLocation != null);
+			StringUtils.writeString(targetLocation, out);
 		} else {
 			throw new IOException("Unknown CheckpointType " + checkpointType);
 		}
@@ -99,13 +101,13 @@ public class CheckpointBarrier extends RuntimeEvent {
 		timestamp = in.readLong();
 
 		int typeOrdinal = in.readInt();
-		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal");
 		CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
 
 		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
 			checkpointOptions = CheckpointOptions.forFullCheckpoint();
 		} else if (checkpointType == CheckpointType.SAVEPOINT) {
-			String targetLocation = in.readUTF();
+			String targetLocation = StringUtils.readString(in);
 			checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
 		} else {
 			throw new IOException("Illegal CheckpointType " + checkpointType);

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 223cbfe..3adf864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -44,6 +44,8 @@ import org.apache.flink.util.Preconditions;
  */
 public class EventSerializer {
 
+	private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8");
+
 	private static final int END_OF_PARTITION_EVENT = 0;
 
 	private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -77,16 +79,16 @@ public class EventSerializer {
 			} else if (checkpointType == CheckpointType.SAVEPOINT) {
 				String targetLocation = checkpointOptions.getTargetLocation();
 				assert(targetLocation != null);
-				byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+				byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET);
 
-				buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+				buf = ByteBuffer.allocate(24 + 4 + locationBytes.length);
 				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
 				buf.putLong(4, barrier.getId());
 				buf.putLong(12, barrier.getTimestamp());
 				buf.putInt(20, checkpointType.ordinal());
-				buf.putInt(24, bytes.length);
-				for (int i = 0; i < bytes.length; i++) {
-					buf.put(28 + i, bytes[i]);
+				buf.putInt(24, locationBytes.length);
+				for (int i = 0; i < locationBytes.length; i++) {
+					buf.put(28 + i, locationBytes[i]);
 				}
 			} else {
 				throw new IOException("Unknown checkpoint type: " + checkpointType);
@@ -204,8 +206,7 @@ public class EventSerializer {
 				CheckpointOptions checkpointOptions;
 
 				int checkpointTypeOrdinal = buffer.getInt();
-				Preconditions.checkElementIndex(type, CheckpointType.values().length,
-					"Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+				Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
 				CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
 
 				if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
@@ -214,7 +215,7 @@ public class EventSerializer {
 					int len = buffer.getInt();
 					byte[] bytes = new byte[len];
 					buffer.get(bytes);
-					String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+					String targetLocation = new String(bytes, STRING_CODING_CHARSET);
 
 					checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
 				} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 0f99496..4e8871a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -45,7 +45,11 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl
 	// ------------------------------------------------------------------------
 	
 	public DataInputDeserializer() {}
-	
+
+	public DataInputDeserializer(byte[] buffer) {
+		setBuffer(buffer, 0, buffer.length);
+	}
+
 	public DataInputDeserializer(byte[] buffer, int start, int len) {
 		setBuffer(buffer, start, len);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
new file mode 100644
index 0000000..dfbde5e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckpointTypeTest {
+
+	/**
+	 * This test validates that the order of enumeration constants is not changed, because the
+	 * ordinal of that enum is used in serialization.
+	 *
+	 * <p>It is still possible to edit both the ordinal and this test, but the test adds
+	 * a level of safety, and should make developers stumble over this when attempting
+	 * to adjust the enumeration.
+	 */
+	@Test
+	public void testOrdinalsAreConstant() {
+		assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
+		assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index dd5b0b6..ad9fc16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -49,7 +49,7 @@ public class CheckpointBarrierTest {
 		DataOutputSerializer out = new DataOutputSerializer(1024);
 		barrier.write(out);
 
-		DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
 		CheckpointBarrier deserialized = new CheckpointBarrier();
 		deserialized.read(in);