You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/23 19:10:31 UTC

[3/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 46f228a..e407443 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -155,7 +156,9 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[21], buffer.getNextNonBlocked());
 
 		// no call for a completed checkpoint must have happened
-		verify(toNotify, times(0)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class),
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+			any(CheckpointMetaData.class),
+			any(CheckpointOptions.class),
 			any(CheckpointMetrics.class));
 
 		assertNull(buffer.getNextNonBlocked());
@@ -242,7 +245,8 @@ public class BarrierBufferAlignmentLimitTest {
 		// checkpoint 4 completed - check and validate buffered replay
 		check(sequence[9], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(4L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(
+			argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		check(sequence[10], buffer.getNextNonBlocked());
 		check(sequence[15], buffer.getNextNonBlocked());
@@ -254,7 +258,8 @@ public class BarrierBufferAlignmentLimitTest {
 		check(sequence[21], buffer.getNextNonBlocked());
 
 		// only checkpoint 4 was successfully completed, not checkpoint 3
-		verify(toNotify, times(0)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(
+			argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		assertNull(buffer.getNextNonBlocked());
 		assertNull(buffer.getNextNonBlocked());
@@ -284,7 +289,7 @@ public class BarrierBufferAlignmentLimitTest {
 	}
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
 	}
 
 	private static void check(BufferOrEvent expected, BufferOrEvent present) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 0cf866a..6e088f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -151,7 +152,7 @@ public class BarrierBufferMassiveRandomTest {
 
 			if (barrierGens[currentChannel].isNextBarrier()) {
 				return new BufferOrEvent(
-						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis()),
+						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()),
 							currentChannel);
 			} else {
 				Buffer buffer = bufferPools[currentChannel].requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 869d1fe..d6056d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -566,7 +567,7 @@ public class BarrierBufferTest {
 			// checkpoint done - replay buffered
 			check(sequence[5], buffer.getNextNonBlocked());
 			validateAlignmentTime(startTs, buffer);
-			verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+			verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			check(sequence[9], buffer.getNextNonBlocked());
@@ -1008,14 +1009,14 @@ public class BarrierBufferTest {
 
 		check(sequence[0], buffer.getNextNonBlocked());
 		check(sequence[2], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[6], buffer.getNextNonBlocked());
 		assertEquals(5L, buffer.getCurrentCheckpointId());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		check(sequence[8], buffer.getNextNonBlocked());
@@ -1078,7 +1079,7 @@ public class BarrierBufferTest {
 		check(sequence[2], buffer.getNextNonBlocked());
 		startTs = System.nanoTime();
 		check(sequence[5], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 
 		check(sequence[6], buffer.getNextNonBlocked());
@@ -1097,7 +1098,7 @@ public class BarrierBufferTest {
 		check(sequence[16], buffer.getNextNonBlocked());
 		startTs = System.nanoTime();
 		check(sequence[20], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[21], buffer.getNextNonBlocked());
 
@@ -1114,7 +1115,7 @@ public class BarrierBufferTest {
 		// a simple successful checkpoint
 		startTs = System.nanoTime();
 		check(sequence[32], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 		check(sequence[33], buffer.getNextNonBlocked());
 
@@ -1175,7 +1176,7 @@ public class BarrierBufferTest {
 
 		// finished first checkpoint
 		check(sequence[3], buffer.getNextNonBlocked());
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		validateAlignmentTime(startTs, buffer);
 
 		check(sequence[5], buffer.getNextNonBlocked());
@@ -1198,7 +1199,7 @@ public class BarrierBufferTest {
 		assertEquals(0L, buffer.getAlignmentDurationNanos());
 
 		// no further checkpoint (abort) notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
 
 		// all done
@@ -1280,7 +1281,7 @@ public class BarrierBufferTest {
 		// checkpoint done
 		check(sequence[7], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointMetrics.class));
+		verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 
 		// queued data
 		check(sequence[10], buffer.getNextNonBlocked());
@@ -1299,7 +1300,7 @@ public class BarrierBufferTest {
 		checkNoTempFilesRemain();
 
 		// check overall notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
@@ -1364,7 +1365,7 @@ public class BarrierBufferTest {
 		// checkpoint finished
 		check(sequence[7], buffer.getNextNonBlocked());
 		validateAlignmentTime(startTs, buffer);
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		check(sequence[11], buffer.getNextNonBlocked());
 
 		// remaining data
@@ -1380,7 +1381,7 @@ public class BarrierBufferTest {
 		checkNoTempFilesRemain();
 
 		// check overall notifications
-		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointMetrics.class));
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class));
 		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
 	}
 
@@ -1389,7 +1390,7 @@ public class BarrierBufferTest {
 	// ------------------------------------------------------------------------
 
 	private static BufferOrEvent createBarrier(long checkpointId, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
 	}
 
 	private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
@@ -1487,12 +1488,12 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			assertTrue("wrong checkpoint id",
 					nextExpectedCheckpointId == -1L || 
 					nextExpectedCheckpointId == checkpointMetaData.getCheckpointId());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index da322f6..05f7da6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -470,7 +471,7 @@ public class BarrierTrackerTest {
 	// ------------------------------------------------------------------------
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
 	}
 
 	private static BufferOrEvent createCancellationBarrier(long id, int channel) {
@@ -502,12 +503,12 @@ public class BarrierTrackerTest {
 		}
 
 		@Override
-		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
+		public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 			throw new UnsupportedOperationException("should never be called");
 		}
 
 		@Override
-		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception {
+		public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
 
 			final long expectedId = checkpointIDs[i++];

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index 5c0f0cf..51294ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -174,6 +175,11 @@ public class BlockingCheckpointsTest {
 		}
 
 		@Override
+		public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, String targetLocation) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 				Environment env, JobID jobID, String operatorIdentifier,
 				TypeSerializer<K> keySerializer, int numberOfKeyGroups,
@@ -276,7 +282,7 @@ public class BlockingCheckpointsTest {
 
 		@Override
 		protected void run() throws Exception {
-			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), new CheckpointMetrics());
+			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new CheckpointMetrics());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 69c2c88..e22bf86 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -408,7 +409,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -427,14 +428,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 		// we should not yet see the barrier, only the two elements from non-blocked input
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 
 		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0));
+		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 
@@ -467,7 +468,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -488,15 +489,15 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		// Now give a later barrier to all inputs, this should unblock the first channel,
 		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1));
+		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
 
 		testHarness.waitForInputProcessing();
 
@@ -504,9 +505,9 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 
 		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 
@@ -557,7 +558,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-		while(!streamTask.triggerCheckpoint(checkpointMetaData));
+		while(!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()));
 
 		// since no state was set, there shouldn't be restore calls
 		assertEquals(0, TestingStreamOperator.numberRestoreCalls);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0773699..1a6fa8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -231,7 +232,7 @@ public class SourceStreamTaskTest {
 			for (int i = 0; i < numCheckpoints; i++) {
 				long currentCheckpointId = checkpointId.getAndIncrement();
 				CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
-				sourceTask.triggerCheckpoint(checkpointMetaData);
+				sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 				Thread.sleep(checkpointInterval);
 			}
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index c2d4aaa..53f77ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -56,7 +57,8 @@ public class StreamTaskCancellationBarrierTest {
 		testHarness.invoke();
 
 		// tell the task to commence a checkpoint
-		boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()));
+		boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()),
+			CheckpointOptions.forFullCheckpoint());
 		assertFalse("task triggered checkpoint though not ready", result);
 
 		// a cancellation barrier should be downstream

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 1e74c3e..3d01fdd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -305,18 +306,18 @@ public class StreamTaskTest extends TestLogger {
 
 		final Exception testException = new Exception("Test exception");
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException);
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException);
 
 		// mock the returned legacy snapshots
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
-		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
-		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
 
 		// set up the task
 
@@ -332,7 +333,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
 		try {
-			streamTask.triggerCheckpoint(checkpointMetaData);
+			streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 			fail("Expected test exception here.");
 		} catch (Exception e) {
 			assertEquals(testException, e.getCause());
@@ -380,18 +381,18 @@ public class StreamTaskTest extends TestLogger {
 
 		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3);
+		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
 
 		// mock the legacy state snapshot
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
-		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
-		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(streamStateHandle3);
 
 		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
 
@@ -405,7 +406,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
-		streamTask.triggerCheckpoint(checkpointMetaData);
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
 
@@ -468,7 +469,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -495,7 +496,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
-		streamTask.triggerCheckpoint(checkpointMetaData);
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		acknowledgeCheckpointLatch.await();
 
@@ -584,7 +585,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -613,7 +614,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
-		streamTask.triggerCheckpoint(checkpointMetaData);
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
 
 		createSubtask.await();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index c0a1638..d465619 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -225,7 +226,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// This element should be buffered since we received a checkpoint barrier on
 		// this input
@@ -262,16 +263,16 @@ public class TwoInputStreamTaskTest {
 			expectedOutput,
 			testHarness.getOutput());
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 		testHarness.endInput();
 		testHarness.waitForTaskCompletion();
 
 		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0));
+		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.",
@@ -306,7 +307,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -329,15 +330,15 @@ public class TwoInputStreamTaskTest {
 
 		// Now give a later barrier to all inputs, this should unblock the first channel,
 		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1));
+		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
 
 		testHarness.waitForInputProcessing();
 
@@ -347,9 +348,9 @@ public class TwoInputStreamTaskTest {
 
 
 		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 01afec6..07424f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.migration.util.MigrationInstantiationUtil;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
@@ -478,11 +479,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Calls {@link StreamOperator#snapshotState(long, long)}.
+	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}.
 	 */
 	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
 
-		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp);
+		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(new JobID(), "test_op");
+
+		OperatorSnapshotResult operatorStateResult = operator.snapshotState(
+			checkpointId,
+			timestamp,
+			CheckpointOptions.forFullCheckpoint());
 
 		KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
 		KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index cde5780..effb44c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -143,9 +144,11 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 
 		if (keyedStateBackend != null) {
-			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(checkpointId,
+			RunnableFuture<KeyGroupsStateHandle> keyedSnapshotRunnable = keyedStateBackend.snapshot(
+					checkpointId,
 					timestamp,
-					streamFactory);
+					streamFactory,
+					CheckpointOptions.forFullCheckpoint());
 			if(!keyedSnapshotRunnable.isDone()) {
 				Thread runner = new Thread(keyedSnapshotRunnable);
 				runner.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 128522b..ac37009 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import java.io.FileNotFoundException;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -160,21 +161,21 @@ public class SavepointITCase extends TestLogger {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 
 			final File checkpointDir = new File(tmpDir, "checkpoints");
-			final File savepointDir = new File(tmpDir, "savepoints");
+			final File savepointRootDir = new File(tmpDir, "savepoints");
 
-			if (!checkpointDir.mkdir() || !savepointDir.mkdirs()) {
+			if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
 				fail("Test setup failed: failed to create temporary directories.");
 			}
 
 			LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
-			LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
+			LOG.info("Created temporary savepoint directory: " + savepointRootDir + ".");
 
 			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
 			config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
-				savepointDir.toURI().toString());
+				savepointRootDir.toURI().toString());
 
 			LOG.info("Flink configuration: " + config + ".");
 
@@ -217,14 +218,6 @@ public class SavepointITCase extends TestLogger {
 				.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
 			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
 
-			// Only one savepoint should exist
-			File[] files = savepointDir.listFiles();
-			if (files != null) {
-				assertEquals("Savepoint not created in expected directory", 1, files.length);
-			} else {
-				fail("Savepoint not created in expected directory");
-			}
-
 			// Retrieve the savepoint from the testing job manager
 			LOG.info("Requesting the savepoint.");
 			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
@@ -240,15 +233,33 @@ public class SavepointITCase extends TestLogger {
 
 			// - Verification START -------------------------------------------
 
+			// Only one savepoint should exist
+			File[] files = savepointRootDir.listFiles();
+
+			if (files != null) {
+				assertEquals("Savepoint not created in expected directory", 1, files.length);
+				assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+
+				File savepointDir = files[0];
+				File[] savepointFiles = savepointDir.listFiles();
+				assertNotNull(savepointFiles);
+				assertTrue("Did not write savepoint files to directory",savepointFiles.length > 1);
+			} else {
+				fail("Savepoint not created in expected directory");
+			}
+
 			// Only one checkpoint of the savepoint should exist
 			// We currently have the following directory layout: checkpointDir/jobId/chk-ID
-			files = checkpointDir.listFiles();
-			assertNotNull("Checkpoint directory empty", files);
-			assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
-			assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
+			File jobCheckpoints = new File(checkpointDir, jobId.toString());
+
+			if (jobCheckpoints.exists()) {
+				files = jobCheckpoints.listFiles();
+				assertNotNull("Checkpoint directory empty", files);
+				assertEquals("Checkpoints directory not cleaned up: " + Arrays.toString(files), 0, files.length);
+			}
 
 			// Only one savepoint should exist
-			files = savepointDir.listFiles();
+			files = savepointRootDir.listFiles();
 			assertNotNull("Savepoint directory empty", files);
 			assertEquals("No savepoint found in savepoint directory", 1, files.length);
 
@@ -399,8 +410,8 @@ public class SavepointITCase extends TestLogger {
 
 			// All savepoints should have been cleaned up
 			errMsg = "Savepoints directory not cleaned up properly: " +
-				Arrays.toString(savepointDir.listFiles()) + ".";
-			assertEquals(errMsg, 0, savepointDir.listFiles().length);
+				Arrays.toString(savepointRootDir.listFiles()) + ".";
+			assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
 
 			// - Verification END ---------------------------------------------
 		} finally {
@@ -468,7 +479,7 @@ public class SavepointITCase extends TestLogger {
 				flink.submitJobAndWait(jobGraph, false);
 			} catch (Exception e) {
 				assertEquals(JobExecutionException.class, e.getClass());
-				assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+				assertEquals(FileNotFoundException.class, e.getCause().getClass());
 			}
 		} finally {
 			if (flink != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index ec6a8f5..79665dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -86,7 +86,6 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-
 	public static class FailingStateBackend extends AbstractStateBackend {
 		private static final long serialVersionUID = 1L;
 
@@ -97,6 +96,12 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
+		public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId,
+			String operatorIdentifier, String targetLocation) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
 		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 				Environment env,
 				JobID jobID,