You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:32 UTC

[4/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c2ada3b..d8e46fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -313,8 +313,8 @@ public class CheckpointCoordinatorTest {
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// check that the vertices received the trigger checkpoint message
-			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
-			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp);
+			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 
 			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
 
@@ -428,14 +428,14 @@ public class CheckpointCoordinatorTest {
 
 			// check that the vertices received the trigger checkpoint message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint1Id), eq(timestamp), any(CheckpointOptions.class));
 			}
 
 			// check that the vertices received the trigger checkpoint message for the second checkpoint
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpoint2Id), eq(timestamp + 2), any(CheckpointOptions.class));
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
@@ -529,8 +529,8 @@ public class CheckpointCoordinatorTest {
 
 			// check that the vertices received the trigger checkpoint message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
 			}
 
 			// acknowledge from one of the tasks
@@ -558,8 +558,8 @@ public class CheckpointCoordinatorTest {
 
 			// validate that the relevant tasks got a confirmation message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
 			}
 
 			CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0);
@@ -589,8 +589,8 @@ public class CheckpointCoordinatorTest {
 
 			// validate that the relevant tasks got a confirmation message
 			{
-				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
-				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+				verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
 
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
 				verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -660,8 +660,8 @@ public class CheckpointCoordinatorTest {
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
 
 			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
 
@@ -687,8 +687,8 @@ public class CheckpointCoordinatorTest {
 			CheckpointMetaData checkpointMetaData2 = new CheckpointMetaData(checkpointId2, 0L);
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
 
 			// we acknowledge the remaining two tasks from the first
 			// checkpoint and two tasks from the second checkpoint
@@ -794,8 +794,8 @@ public class CheckpointCoordinatorTest {
 			long checkpointId1 = pending1.getCheckpointId();
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
 
 			CheckpointMetaData checkpointMetaData1 = new CheckpointMetaData(checkpointId1, 0L);
 
@@ -819,8 +819,8 @@ public class CheckpointCoordinatorTest {
 			long checkpointId2 = pending2.getCheckpointId();
 
 			// trigger messages should have been sent
-			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
-			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2));
+			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
+			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
 
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
@@ -1142,7 +1142,7 @@ public class CheckpointCoordinatorTest {
 					numCalls.incrementAndGet();
 					return null;
 				}
-			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
+			}).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
 				jid,
@@ -1232,7 +1232,7 @@ public class CheckpointCoordinatorTest {
 				triggerCalls.add((Long) invocation.getArguments()[0]);
 				return null;
 			}
-		}).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong());
+		}).when(executionAttempt).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 		final long delay = 50;
 
@@ -1398,7 +1398,6 @@ public class CheckpointCoordinatorTest {
 		assertFalse(savepointFuture.isDone());
 
 		long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
-		CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
 
@@ -1414,8 +1413,8 @@ public class CheckpointCoordinatorTest {
 
 		// validate that the relevant tasks got a confirmation message
 		{
-			verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
-			verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew));
+			verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
+			verify(vertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
 
 			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
 			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointIdNew), eq(timestampNew));
@@ -1537,7 +1536,7 @@ public class CheckpointCoordinatorTest {
 					numCalls.incrementAndGet();
 					return null;
 				}
-			}).when(execution).triggerCheckpoint(anyLong(), anyLong());
+			}).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			doAnswer(new Answer<Void>() {
 				@Override
@@ -1578,7 +1577,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(maxConcurrentAttempts, numCalls.get());
 
 			verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
-					.triggerCheckpoint(anyLong(), anyLong());
+					.triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
 
 			// now, once we acknowledge one checkpoint, it should trigger the next one
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
new file mode 100644
index 0000000..6788338
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.junit.Test;
+
+public class CheckpointOptionsTest {
+
+	@Test
+	public void testFullCheckpoint() throws Exception {
+		CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
+		assertEquals(CheckpointType.FULL_CHECKPOINT, options.getCheckpointType());
+		assertNull(options.getTargetLocation());
+	}
+
+	@Test
+	public void testSavepoint() throws Exception {
+		String location = "asdasdadasdasdja7931481398123123123kjhasdkajsd";
+		CheckpointOptions options = CheckpointOptions.forSavepoint(location);
+		assertEquals(CheckpointType.SAVEPOINT, options.getCheckpointType());
+		assertEquals(location, options.getTargetLocation());
+	}
+
+	@Test(expected = NullPointerException.class)
+	public void testSavepointNullCheck() throws Exception {
+		CheckpointOptions.forSavepoint(null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 3c373f1..95a31d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -184,6 +184,7 @@ public class CheckpointStatsHistoryTest {
 		when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
 		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		when(completed.getCheckpointId()).thenReturn(checkpointId);
+		when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
 		return completed;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 512768d..6ab8620 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -183,7 +183,7 @@ public class MigrationV0ToV1Test {
 
 		} finally {
 			// Dispose
-			SavepointStore.removeSavepoint(path.toString());
+			SavepointStore.removeSavepointFile(path.toString());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6471d6f..c66b29d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -64,12 +64,12 @@ public class SavepointLoaderTest {
 		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
 		taskStates.put(vertexId, state);
 
+		JobID jobId = new JobID();
+
 		// Store savepoint
 		SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values());
 		String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
-		JobID jobId = new JobID();
-
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(parallelism);
 		when(vertex.getMaxParallelism()).thenReturn(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 3398341..dc19e47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import java.io.File;
+import java.util.Arrays;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -38,6 +41,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -54,14 +58,22 @@ public class SavepointStoreTest {
 	 */
 	@Test
 	public void testStoreLoadDispose() throws Exception {
-		String target = tmp.getRoot().getAbsolutePath();
+		String root = tmp.getRoot().getAbsolutePath();
+		File rootFile = new File(root);
 
-		assertEquals(0, tmp.getRoot().listFiles().length);
+		File[] list = rootFile.listFiles();
+
+		assertNotNull(list);
+		assertEquals(0, list.length);
 
 		// Store
+		String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
 		SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
-		String path = SavepointStore.storeSavepoint(target, stored);
-		assertEquals(1, tmp.getRoot().listFiles().length);
+		String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
+
+		list = rootFile.listFiles();
+		assertNotNull(list);
+		assertEquals(1, list.length);
 
 		// Load
 		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
@@ -70,9 +82,11 @@ public class SavepointStoreTest {
 		loaded.dispose();
 
 		// Dispose
-		SavepointStore.removeSavepoint(path);
+		SavepointStore.deleteSavepointDirectory(path);
 
-		assertEquals(0, tmp.getRoot().listFiles().length);
+		list = rootFile.listFiles();
+		assertNotNull(list);
+		assertEquals(0, list.length);
 	}
 
 	/**
@@ -108,8 +122,8 @@ public class SavepointStoreTest {
 
 		assertTrue(serializers.size() >= 1);
 
-		String target = tmp.getRoot().getAbsolutePath();
-		assertEquals(0, tmp.getRoot().listFiles().length);
+		String root = tmp.getRoot().getAbsolutePath();
+		File rootFile = new File(root);
 
 		// New savepoint type for test
 		int version = ThreadLocalRandom.current().nextInt();
@@ -118,14 +132,24 @@ public class SavepointStoreTest {
 		// Add serializer
 		serializers.put(version, NewSavepointSerializer.INSTANCE);
 
+		String savepointDirectory1 = SavepointStore.createSavepointDirectory(root, new JobID());
 		TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
-		String pathNewSavepoint = SavepointStore.storeSavepoint(target, newSavepoint);
-		assertEquals(1, tmp.getRoot().listFiles().length);
+		String pathNewSavepoint = SavepointStore.storeSavepoint(savepointDirectory1, newSavepoint);
+
+		File[] list = rootFile.listFiles();
+
+		assertNotNull(list);
+		assertEquals(1, list.length);
 
 		// Savepoint v0
+		String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
 		Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
-		String pathSavepoint = SavepointStore.storeSavepoint(target, savepoint);
-		assertEquals(2, tmp.getRoot().listFiles().length);
+		String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
+
+		list = rootFile.listFiles();
+
+		assertNotNull(list);
+		assertEquals(2, list.length);
 
 		// Load
 		Savepoint loaded = SavepointStore.loadSavepoint(pathNewSavepoint, Thread.currentThread().getContextClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
new file mode 100644
index 0000000..dd5b0b6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.junit.Test;
+
+public class CheckpointBarrierTest {
+
+	/**
+	 * Test serialization of the checkpoint barrier.
+	 */
+	@Test
+	public void testSerialization() throws Exception {
+		long id = Integer.MAX_VALUE + 123123L;
+		long timestamp = Integer.MAX_VALUE + 1228L;
+
+		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+		testSerialization(id, timestamp, checkpoint);
+
+		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+		testSerialization(id, timestamp, savepoint);
+	}
+
+	private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+
+		DataOutputSerializer out = new DataOutputSerializer(1024);
+		barrier.write(out);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer());
+		CheckpointBarrier deserialized = new CheckpointBarrier();
+		deserialized.read(in);
+
+		assertEquals(id, deserialized.getId());
+		assertEquals(timestamp, deserialized.getTimestamp());
+		assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+		assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 271d0d2..e674eb7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -25,25 +33,42 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-
 import org.junit.Test;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
+public class EventSerializerTest {
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+	@Test
+	public void testCheckpointBarrierSerialization() throws Exception {
+		long id = Integer.MAX_VALUE + 123123L;
+		long timestamp = Integer.MAX_VALUE + 1228L;
 
-public class EventSerializerTest {
+		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+		testCheckpointBarrierSerialization(id, timestamp, checkpoint);
+
+		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
+		testCheckpointBarrierSerialization(id, timestamp, savepoint);
+	}
+
+	private void testCheckpointBarrierSerialization(long id, long timestamp, CheckpointOptions options) throws IOException {
+		ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
+		ByteBuffer serialized = EventSerializer.toSerializedEvent(barrier);
+		CheckpointBarrier deserialized = (CheckpointBarrier) EventSerializer.fromSerializedEvent(serialized, cl);
+		assertFalse(serialized.hasRemaining());
+
+		assertEquals(id, deserialized.getId());
+		assertEquals(timestamp, deserialized.getTimestamp());
+		assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType());
+		assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation());
+	}
 
 	@Test
 	public void testSerializeDeserializeEvent() throws Exception {
 		AbstractEvent[] events = {
 				EndOfPartitionEvent.INSTANCE,
 				EndOfSuperstepEvent.INSTANCE,
-				new CheckpointBarrier(1678L, 4623784L),
+				new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
 				new TestTaskEvent(Math.random(), 12361231273L),
 				new CancelCheckpointMarker(287087987329842L)
 		};
@@ -94,7 +119,7 @@ public class EventSerializerTest {
 		AbstractEvent[] events = {
 			EndOfPartitionEvent.INSTANCE,
 			EndOfSuperstepEvent.INSTANCE,
-			new CheckpointBarrier(1678L, 4623784L),
+			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
 			new TestTaskEvent(Math.random(), 12361231273L),
 			new CancelCheckpointMarker(287087987329842L)
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63175ed..900b5c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -327,7 +328,7 @@ public class RecordWriterTest {
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L);
+		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forFullCheckpoint());
 
 		// No records emitted yet, broadcast should not request a buffer
 		writer.broadcastEvent(barrier);
@@ -363,7 +364,7 @@ public class RecordWriterTest {
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L);
+		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forFullCheckpoint());
 
 		// Emit records on some channels first (requesting buffers), then
 		// broadcast the event. The record buffers should be emitted first, then

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index de54d1f..5a38be2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -601,7 +602,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 			ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare(
 					String.valueOf(UUID.randomUUID()),
 					InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
@@ -619,7 +620,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			throw new UnsupportedOperationException("should not be called!");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index db45231..bc420cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -49,7 +50,7 @@ public class CheckpointMessagesTest {
 			NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
 			testSerializabilityEqualsHashCode(cc);
 
-			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
+			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forFullCheckpoint());
 			testSerializabilityEqualsHashCode(tc);
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 5bd085f..94df524 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
@@ -165,7 +166,7 @@ public class OperatorStateBackendTest {
 		listState3.add(20);
 
 		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
-		OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory).get();
+		OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get();
 
 		try {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3b0350d..f2416b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -191,7 +192,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		// draw a snapshot
-		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -202,7 +203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("u3");
 
 		// draw another snapshot
-		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -403,7 +404,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals(13, (int) state2.value());
 
 		// draw a snapshot
-		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		backend.dispose();
 		backend = restoreKeyedBackend(
@@ -476,7 +477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals(42L, (long) state.value());
 
 		// draw a snapshot
-		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		backend.dispose();
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
@@ -521,7 +522,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -532,7 +533,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("u3");
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -620,7 +621,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -631,7 +632,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("u3");
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -722,7 +723,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -734,7 +735,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add(103);
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -829,7 +830,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// make some more modifications
 			backend.setCurrentKey(1);
@@ -841,7 +842,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
 
 			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+			KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			// validate the original state
 			backend.setCurrentKey(1);
@@ -1163,7 +1164,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("ShouldBeInSecondHalf");
 
 
-		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory));
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		List<KeyGroupsStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles(
 				Collections.singletonList(snapshot),
@@ -1230,7 +1231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.update("2");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1281,7 +1282,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("2");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1334,7 +1335,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("2");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1385,7 +1386,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.put("2", "Second");
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory));
+			KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -1661,7 +1662,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
-		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
 		backend.dispose();
 
@@ -1692,7 +1693,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 
 			// draw a snapshot
-			KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
+			KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
 			assertNull(snapshot);
 			backend.dispose();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
new file mode 100644
index 0000000..a29d29c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FsSavepointStreamFactoryTest {
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	/**
+	 * Tests that the factory creates all files in the given directory without
+	 * creating any sub directories.
+	 */
+	@Test
+	public void testSavepointStreamDirectoryLayout() throws Exception {
+		File testRoot = folder.newFolder();
+		JobID jobId = new JobID();
+
+		FsSavepointStreamFactory savepointStreamFactory = new FsSavepointStreamFactory(
+				new Path(testRoot.getAbsolutePath()),
+				jobId,
+				0);
+
+		File[] listed = testRoot.listFiles();
+		assertNotNull(listed);
+		assertEquals(0, listed.length);
+
+		FsCheckpointStateOutputStream stream = savepointStreamFactory
+			.createCheckpointStateOutputStream(1273, 19231);
+
+		stream.write(1);
+
+		FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+		listed = testRoot.listFiles();
+		assertNotNull(listed);
+		assertEquals(1, listed.length);
+		assertEquals(handle.getFilePath().getPath(), listed[0].getPath());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 187163d..89ae5da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -91,7 +92,7 @@ public class TaskAsyncCallTest {
 			awaitLatch.await();
 			
 			for (int i = 1; i <= NUM_CALLS; i++) {
-				task.triggerCheckpointBarrier(i, 156865867234L);
+				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
 			}
 			
 			triggerLatch.await();
@@ -121,7 +122,7 @@ public class TaskAsyncCallTest {
 			awaitLatch.await();
 
 			for (int i = 1; i <= NUM_CALLS; i++) {
-				task.triggerCheckpointBarrier(i, 156865867234L);
+				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
 				task.notifyCheckpointComplete(i);
 			}
 
@@ -226,7 +227,7 @@ public class TaskAsyncCallTest {
 		public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
 			lastCheckpointId++;
 			if (checkpointMetaData.getCheckpointId() == lastCheckpointId) {
 				if (lastCheckpointId == NUM_CALLS) {
@@ -243,7 +244,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			throw new UnsupportedOperationException("Should not be called");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 144247f..05fda28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.IOException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
@@ -36,6 +37,8 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -340,17 +343,19 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
-	public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception {
+	public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 
 		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 
 		OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
 
+		CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
+
 		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
 				checkpointId,
 				timestamp,
-				checkpointStreamFactory,
+				factory,
 				keyGroupRange,
 				getContainingTask().getCancelables())) {
 
@@ -361,12 +366,12 @@ public abstract class AbstractStreamOperator<OUT>
 
 			if (null != operatorStateBackend) {
 				snapshotInProgress.setOperatorStateManagedFuture(
-					operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+					operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
 			}
 
 			if (null != keyedStateBackend) {
 				snapshotInProgress.setKeyedStateManagedFuture(
-					keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
+					keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
 			}
 		} catch (Exception snapshotException) {
 			try {
@@ -431,11 +436,12 @@ public abstract class AbstractStreamOperator<OUT>
 	@SuppressWarnings("deprecation")
 	@Deprecated
 	@Override
-	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 		if (this instanceof StreamCheckpointedOperator) {
+			CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
 
 			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-					checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+				factory.createCheckpointStateOutputStream(checkpointId, timestamp);
 
 			getContainingTask().getCancelables().registerClosable(outStream);
 
@@ -495,6 +501,31 @@ public abstract class AbstractStreamOperator<OUT>
 	@Override
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {}
 
+	/**
+	 * Returns a checkpoint stream factory for the provided options.
+	 *
+	 * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared
+	 * factory of this operator.
+	 *
+	 * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory per
+	 * savepoint.
+	 *
+	 * @param checkpointOptions Options for the checkpoint
+	 * @return Checkpoint stream factory for the checkpoints
+	 * @throws IOException Failures while creating a new stream factory are forwarded
+	 */
+	@VisibleForTesting
+	CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException {
+		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			return checkpointStreamFactory;
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation());
+		} else {
+			throw new IllegalStateException("Unknown checkpoint type " + checkpointType);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Properties and Services
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
index 5a6c37b..83697ae 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.ExceptionUtils;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * Result of {@link AbstractStreamOperator#snapshotState}.
+ * Result of {@link StreamOperator#snapshotState}.
  */
 public class OperatorSnapshotResult {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index d8e4d08..006e910 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -98,7 +99,10 @@ public interface StreamOperator<OUT> extends Serializable {
 	 * 
 	 * @throws Exception exception that happened during snapshotting.
 	 */
-	OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception;
+	OperatorSnapshotResult snapshotState(
+		long checkpointId,
+		long timestamp,
+		CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}.
@@ -110,7 +114,10 @@ public interface StreamOperator<OUT> extends Serializable {
 	 */
 	@SuppressWarnings("deprecation")
 	@Deprecated
-	StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception;
+	StreamStateHandle snapshotLegacyOperatorState(
+		long checkpointId,
+		long timestamp,
+		CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Provides state handles to restore the operator state.
@@ -142,4 +149,5 @@ public interface StreamOperator<OUT> extends Serializable {
 	void setChainingStrategy(ChainingStrategy strategy);
 
 	MetricGroup getMetricGroup();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 611bd44..2da8389 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -368,7 +368,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					.setBytesBufferedInAlignment(bytesBuffered)
 					.setAlignmentDurationNanos(latestAlignmentDurationNanos);
 
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+				checkpointMetaData,
+				checkpointBarrier.getCheckpointOptions(),
+				checkpointMetrics);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 77608c6..8b1b65b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -132,7 +133,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
+			notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
 			return;
 		}
 
@@ -170,7 +171,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 						LOG.debug("Received all barriers for checkpoint {}", barrierId);
 					}
 
-					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
+					notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
 				}
 			}
 		}
@@ -248,14 +249,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		}
 	}
 
-	private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+	private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 		if (toNotifyOnCheckpoint != null) {
 			CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
 				.setBytesBufferedInAlignment(0L)
 				.setAlignmentDurationNanos(0L);
 
-			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointMetrics);
+			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4f07182..dd93592 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -164,9 +165,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 	}
 
-	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+	public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
 		try {
-			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+			CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
 			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
 				streamOutput.broadcastEvent(barrier);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 62cfb8f..938ffd2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -163,7 +164,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private TaskStateHandles restoreStateHandles;
 
-
 	/** The currently active background materialization threads */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
 
@@ -520,14 +520,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 		try {
 			// No alignment if we inject a checkpoint
 			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
 					.setBytesBufferedInAlignment(0L)
 					.setAlignmentDurationNanos(0L);
 
-			return performCheckpoint(checkpointMetaData, checkpointMetrics);
+			return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -543,9 +543,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+	public void triggerCheckpointOnBarrier(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+
 		try {
-			performCheckpoint(checkpointMetaData, checkpointMetrics);
+			performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 		catch (CancelTaskException e) {
 			throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " +
@@ -570,8 +574,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
-		LOG.debug("Starting checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), getName());
+	private boolean performCheckpoint(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+
+		LOG.debug("Starting checkpoint ({}) {} on task {}",
+			checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
 
 		synchronized (lock) {
 			if (isRunning) {
@@ -582,9 +591,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				// Given this, we immediately emit the checkpoint barriers, so the downstream operators
 				// can start their checkpoint work as soon as possible
 				operatorChain.broadcastCheckpointBarrier(
-						checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
+						checkpointMetaData.getCheckpointId(),
+						checkpointMetaData.getTimestamp(),
+						checkpointOptions);
 
-				checkpointState(checkpointMetaData, checkpointMetrics);
+				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
 				return true;
 			}
 			else {
@@ -637,8 +648,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
-		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointMetrics);
+	private void checkpointState(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+
+		CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
+			this,
+			checkpointMetaData,
+			checkpointOptions,
+			checkpointMetrics);
+
 		checkpointingOperation.executeCheckpointing();
 	}
 
@@ -814,7 +834,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return stateBackend.createStreamFactory(
 				getEnvironment().getJobID(),
 				createOperatorIdentifier(operator, configuration.getVertexID()));
+	}
 
+	public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> operator, String targetLocation) throws IOException {
+		return stateBackend.createSavepointStreamFactory(
+			getEnvironment().getJobID(),
+			createOperatorIdentifier(operator, configuration.getVertexID()),
+			targetLocation);
 	}
 
 	private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
@@ -1048,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private final StreamTask<?, ?> owner;
 
 		private final CheckpointMetaData checkpointMetaData;
+		private final CheckpointOptions checkpointOptions;
 		private final CheckpointMetrics checkpointMetrics;
 
 		private final StreamOperator<?>[] allOperators;
@@ -1060,9 +1087,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		private final List<StreamStateHandle> nonPartitionedStates;
 		private final List<OperatorSnapshotResult> snapshotInProgressList;
 
-		public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
+		public CheckpointingOperation(
+				StreamTask<?, ?> owner,
+				CheckpointMetaData checkpointMetaData,
+				CheckpointOptions checkpointOptions,
+				CheckpointMetrics checkpointMetrics) {
+
 			this.owner = Preconditions.checkNotNull(owner);
 			this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
+			this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
 			this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
 			this.allOperators = owner.operatorChain.getAllOperators();
 			this.nonPartitionedStates = new ArrayList<>(allOperators.length);
@@ -1137,14 +1170,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		@SuppressWarnings("deprecation")
 		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 			if (null != op) {
-				// first call the legacy checkpoint code paths 
+				// first call the legacy checkpoint code paths
 				nonPartitionedStates.add(op.snapshotLegacyOperatorState(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getTimestamp()));
+						checkpointMetaData.getTimestamp(),
+						checkpointOptions));
 
 				OperatorSnapshotResult snapshotInProgress = op.snapshotState(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getTimestamp());
+						checkpointMetaData.getTimestamp(),
+						checkpointOptions);
 
 				snapshotInProgressList.add(snapshotInProgress);
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 6751617..51b9d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -100,4 +100,4 @@ public class ListCheckpointedTest {
 			return restored;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 274611a..8507200 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -17,12 +17,34 @@
  */
 package org.apache.flink.streaming.api.operators;
 
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.RunnableFuture;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -45,27 +67,6 @@ import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.RunnableFuture;
-
-import static junit.framework.TestCase.assertTrue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.spy;
-import static org.powermock.api.mockito.PowerMockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
-
 /**
  * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly
  * tests timers and state and whether they are correctly checkpointed/restored
@@ -495,10 +496,10 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
-		operator.snapshotState(checkpointId, timestamp);
+		operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 
 		verify(context).close();
 	}
@@ -524,14 +525,14 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		// lets fail when calling the actual snapshotState method
 		doThrow(failingException).when(operator).snapshotState(eq(context));
 
 		try {
-			operator.snapshotState(checkpointId, timestamp);
+			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
@@ -571,23 +572,29 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+
+		// The amount of mocking in this test makes it necessary to make the
+		// getCheckpointStreamFactory method visible for the test and to
+		// overwrite its behaviour.
+		when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory);
+
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
 
 		OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class);
-		when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle);
+		when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
 		AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
-		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException);
+		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException);
 
 		Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
 		Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
 		Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
 
 		try {
-			operator.snapshotState(checkpointId, timestamp);
+			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index c4ddea8..d331171 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -90,8 +91,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
 			"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
 			"org.apache.flink.streaming.api.operators.Output], " +
-			"snapshotLegacyOperatorState[long, long], " +
-			"snapshotState[long, long]]";
+			"snapshotLegacyOperatorState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions], " +
+			"snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
 
 	private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
 			", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -240,7 +241,8 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 						try {
 							runStarted.await();
 							if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
-									new CheckpointMetaData(0, System.currentTimeMillis()))) {
+									new CheckpointMetaData(0, System.currentTimeMillis()),
+									CheckpointOptions.forFullCheckpoint())) {
 								LifecycleTrackingStreamSource.runFinish.trigger();
 							}
 						} catch (Exception e) {
@@ -260,9 +262,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		}
 
 		@Override
-		public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+		public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
 			ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
-			return super.snapshotLegacyOperatorState(checkpointId, timestamp);
+			return super.snapshotLegacyOperatorState(checkpointId, timestamp, checkpointOptions);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index b1689f9..ab4258f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -184,4 +184,4 @@ public class WrappingFunctionSnapshotRestoreTest {
 			return value;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 907f8f1..c4867ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -522,7 +523,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-		task.triggerCheckpoint(checkpointMetaData);
+		task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		env.getCheckpointLatch().await();
 
@@ -557,7 +558,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));
 
 		// trigger the checkpoint while processing stream elements
-		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp));
+		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forFullCheckpoint());
 
 		restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));