You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/27 16:26:25 UTC

[8/8] flink git commit: [FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options

[FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options

Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when actually,
the checkpoints may always be incremental and only savepoints have to be full
and self contained.

Initially, we planned to add options for multiple checkpoints, like checkpoints
that were foreced to be full, and checkpoints that were incremental. That
is not necessary at this point.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe3b2768
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe3b2768
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe3b2768

Branch: refs/heads/master
Commit: fe3b276818eec1d4a70632a45343d70dc2be53f3
Parents: a6df90e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 25 14:16:37 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 27 17:37:23 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  6 +-
 .../state/RocksDBStateBackendTest.java          | 12 ++--
 .../checkpoint/CheckpointCoordinator.java       |  4 +-
 .../runtime/checkpoint/CheckpointOptions.java   |  8 +--
 .../api/serialization/EventSerializer.java      |  6 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  4 +-
 .../checkpoint/CheckpointOptionsTest.java       |  4 +-
 .../runtime/checkpoint/CheckpointTypeTest.java  |  2 +-
 .../io/network/api/CheckpointBarrierTest.java   |  2 +-
 .../api/serialization/EventSerializerTest.java  |  6 +-
 .../io/network/api/writer/RecordWriterTest.java |  4 +-
 .../messages/CheckpointMessagesTest.java        |  2 +-
 .../runtime/state/MemoryStateBackendTest.java   |  4 +-
 .../runtime/state/OperatorStateBackendTest.java | 14 ++--
 .../runtime/state/StateBackendTestBase.java     | 74 ++++++++++----------
 .../state/StateSnapshotCompressionTest.java     |  2 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java |  2 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  4 +-
 .../api/operators/AbstractStreamOperator.java   |  4 +-
 .../runtime/tasks/SourceStreamTask.java         |  2 +-
 .../operators/AbstractStreamOperatorTest.java   |  8 +--
 .../AbstractUdfStreamOperatorLifecycleTest.java |  2 +-
 .../operators/async/AsyncWaitOperatorTest.java  |  4 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |  2 +-
 .../io/BarrierBufferMassiveRandomTest.java      |  2 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  2 +-
 .../runtime/io/BarrierTrackerTest.java          |  2 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |  2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   | 30 ++++----
 .../runtime/tasks/RestoreStreamTaskTest.java    |  2 +-
 .../SourceExternalCheckpointTriggerTest.java    |  4 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |  2 +-
 .../StreamTaskCancellationBarrierTest.java      |  2 +-
 .../tasks/StreamTaskTerminationTest.java        |  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 10 +--
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 28 ++++----
 .../util/AbstractStreamOperatorTestHarness.java |  2 +-
 .../ExternalizedCheckpointITCase.java           |  2 +-
 38 files changed, 137 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index b519b1a..2ba0494 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -183,7 +183,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 			}
 		}
 
-		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forCheckpoint());
 
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 
@@ -284,7 +284,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		task.triggerCheckpoint(
 			new CheckpointMetaData(42, 17),
-			CheckpointOptions.forFullCheckpoint());
+			CheckpointOptions.forCheckpoint());
 
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 		blockerCheckpointStreamFactory.getWaiterLatch().await();
@@ -355,7 +355,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 				new ValueStateDescriptor<>("foobar", String.class));
 
 			RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
-				checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
+				checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forCheckpoint());
 
 			try {
 				FutureUtil.runIfNotDoneAndGet(snapshotFuture);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index a2ec052..e1be744 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -250,7 +250,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 
 		try {
 			RunnableFuture<KeyedStateHandle> snapshot =
-				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
 
 			RocksDB spyDB = keyedStateBackend.db;
 
@@ -287,7 +287,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		setupRocksKeyedStateBackend();
 		try {
 			RunnableFuture<KeyedStateHandle> snapshot =
-				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
 			snapshot.cancel(true);
 			verifyRocksObjectsReleased();
 		} finally {
@@ -301,7 +301,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		setupRocksKeyedStateBackend();
 		try {
 			RunnableFuture<KeyedStateHandle> snapshot =
-				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
 			snapshot.cancel(true);
 			Thread asyncSnapshotThread = new Thread(snapshot);
 			asyncSnapshotThread.start();
@@ -324,7 +324,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		setupRocksKeyedStateBackend();
 		try {
 			RunnableFuture<KeyedStateHandle> snapshot =
-				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
 			Thread asyncSnapshotThread = new Thread(snapshot);
 			asyncSnapshotThread.start();
 			waiter.await(); // wait for snapshot to run
@@ -349,7 +349,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	public void testCancelRunningSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
 		try {
-			RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+			RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpoint());
 			Thread asyncSnapshotThread = new Thread(snapshot);
 			asyncSnapshotThread.start();
 			waiter.await(); // wait for snapshot to run
@@ -429,7 +429,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 						checkpointId,
 						checkpointId,
 						createStreamFactory(),
-						CheckpointOptions.forFullCheckpoint());
+						CheckpointOptions.forCheckpoint());
 
 					snapshot.run();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index f932354..62d92ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -435,7 +435,7 @@ public class CheckpointCoordinator {
 			case SAVEPOINT:
 				return triggerSavepoint(timestamp, options.getTargetLocation());
 
-			case FULL_CHECKPOINT:
+			case CHECKPOINT:
 				CheckpointTriggerResult triggerResult =
 					triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, false);
 
@@ -666,7 +666,7 @@ public class CheckpointCoordinator {
 
 				CheckpointOptions checkpointOptions;
 				if (!props.isSavepoint()) {
-					checkpointOptions = CheckpointOptions.forFullCheckpoint();
+					checkpointOptions = CheckpointOptions.forCheckpoint();
 				} else {
 					checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 676cf3b..27e5d0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -79,10 +79,10 @@ public class CheckpointOptions implements Serializable {
 
 	// ------------------------------------------------------------------------
 
-	private static final CheckpointOptions FULL_CHECKPOINT = new CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null);
+	private static final CheckpointOptions CHECKPOINT = new CheckpointOptions(CheckpointType.CHECKPOINT, null);
 
-	public static CheckpointOptions forFullCheckpoint() {
-		return FULL_CHECKPOINT;
+	public static CheckpointOptions forCheckpoint() {
+		return CHECKPOINT;
 	}
 
 	public static CheckpointOptions forSavepoint(String targetDirectory) {
@@ -98,7 +98,7 @@ public class CheckpointOptions implements Serializable {
 	public enum CheckpointType {
 
 		/** A full checkpoint. */
-		FULL_CHECKPOINT,
+		CHECKPOINT,
 
 		/** A savepoint. */
 		SAVEPOINT;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 3adf864..aa8133f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -70,7 +70,7 @@ public class EventSerializer {
 			CheckpointType checkpointType = checkpointOptions.getCheckpointType();
 
 			ByteBuffer buf;
-			if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			if (checkpointType == CheckpointType.CHECKPOINT) {
 				buf = ByteBuffer.allocate(24);
 				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
 				buf.putLong(4, barrier.getId());
@@ -209,8 +209,8 @@ public class EventSerializer {
 				Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal");
 				CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
 
-				if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-					checkpointOptions = CheckpointOptions.forFullCheckpoint();
+				if (checkpointType == CheckpointType.CHECKPOINT) {
+					checkpointOptions = CheckpointOptions.forCheckpoint();
 				} else if (checkpointType == CheckpointType.SAVEPOINT) {
 					int len = buffer.getInt();
 					byte[] bytes = new byte[len];

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 282cb56..51d2142 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -337,8 +337,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			assertFalse(checkpoint.isFullyAcknowledged());
 
 			// check that the vertices received the trigger checkpoint message
-			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
-			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+			verify(vertex1.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
+			verify(vertex2.getCurrentExecutionAttempt()).triggerCheckpoint(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
 
 			// acknowledge from one of the tasks
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
index 6788338..c0eed37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
@@ -28,8 +28,8 @@ public class CheckpointOptionsTest {
 
 	@Test
 	public void testFullCheckpoint() throws Exception {
-		CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
-		assertEquals(CheckpointType.FULL_CHECKPOINT, options.getCheckpointType());
+		CheckpointOptions options = CheckpointOptions.forCheckpoint();
+		assertEquals(CheckpointType.CHECKPOINT, options.getCheckpointType());
 		assertNull(options.getTargetLocation());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
index dfbde5e..5aa478c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -36,7 +36,7 @@ public class CheckpointTypeTest {
 	 */
 	@Test
 	public void testOrdinalsAreConstant() {
-		assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
+		assertEquals(0, CheckpointType.CHECKPOINT.ordinal());
 		assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index ba833c3..c8bffb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -37,7 +37,7 @@ public class CheckpointBarrierTest {
 		long id = Integer.MAX_VALUE + 123123L;
 		long timestamp = Integer.MAX_VALUE + 1228L;
 
-		CheckpointOptions options = CheckpointOptions.forFullCheckpoint();
+		CheckpointOptions options = CheckpointOptions.forCheckpoint();
 		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index f51b083..f79ab0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -42,7 +42,7 @@ public class EventSerializerTest {
 		long id = Integer.MAX_VALUE + 123123L;
 		long timestamp = Integer.MAX_VALUE + 1228L;
 
-		CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint();
+		CheckpointOptions checkpoint = CheckpointOptions.forCheckpoint();
 		testCheckpointBarrierSerialization(id, timestamp, checkpoint);
 
 		CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123");
@@ -68,7 +68,7 @@ public class EventSerializerTest {
 		AbstractEvent[] events = {
 				EndOfPartitionEvent.INSTANCE,
 				EndOfSuperstepEvent.INSTANCE,
-				new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
+				new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpoint()),
 				new TestTaskEvent(Math.random(), 12361231273L),
 				new CancelCheckpointMarker(287087987329842L)
 		};
@@ -119,7 +119,7 @@ public class EventSerializerTest {
 		AbstractEvent[] events = {
 			EndOfPartitionEvent.INSTANCE,
 			EndOfSuperstepEvent.INSTANCE,
-			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forFullCheckpoint()),
+			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpoint()),
 			new TestTaskEvent(Math.random(), 12361231273L),
 			new CancelCheckpointMarker(287087987329842L)
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 98d4f65..df5c616 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -328,7 +328,7 @@ public class RecordWriterTest {
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forFullCheckpoint());
+		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpoint());
 
 		// No records emitted yet, broadcast should not request a buffer
 		writer.broadcastEvent(barrier);
@@ -364,7 +364,7 @@ public class RecordWriterTest {
 
 		ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider);
 		RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
-		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forFullCheckpoint());
+		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpoint());
 
 		// Emit records on some channels first (requesting buffers), then
 		// broadcast the event. The record buffers should be emitted first, then

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index b36ac86..5cda230 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -53,7 +53,7 @@ public class CheckpointMessagesTest {
 			NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
 			testSerializabilityEqualsHashCode(cc);
 
-			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forFullCheckpoint());
+			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L, CheckpointOptions.forCheckpoint());
 			testSerializabilityEqualsHashCode(tc);
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index ed6bef4..8ad368b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -217,7 +217,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 
 		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
 		try {
@@ -272,7 +272,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 			682375462378L,
 			2,
 			streamFactory,
-			CheckpointOptions.forFullCheckpoint()));
+			CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 0fff501..784ce9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -235,7 +235,7 @@ public class OperatorStateBackendTest {
 
 		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 		FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
 		// make sure that the copy method has been called
@@ -355,7 +355,7 @@ public class OperatorStateBackendTest {
 				abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
 
 		RunnableFuture<OperatorStateHandle> snapshot =
-				operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+				operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
 
 		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
 		assertNull(stateHandle);
@@ -387,7 +387,7 @@ public class OperatorStateBackendTest {
 
 		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
 		try {
@@ -470,7 +470,7 @@ public class OperatorStateBackendTest {
 		streamFactory.setBlockerLatch(blockerLatch);
 
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 
 		ExecutorService executorService = Executors.newFixedThreadPool(1);
 
@@ -572,7 +572,7 @@ public class OperatorStateBackendTest {
 		streamFactory.setBlockerLatch(blockerLatch);
 
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 
 		ExecutorService executorService = Executors.newFixedThreadPool(1);
 
@@ -616,7 +616,7 @@ public class OperatorStateBackendTest {
 		streamFactory.setBlockerLatch(blockerLatch);
 
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+				operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 
 		ExecutorService executorService = Executors.newFixedThreadPool(1);
 
@@ -667,7 +667,7 @@ public class OperatorStateBackendTest {
 
 		CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
 		RunnableFuture<OperatorStateHandle> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
+			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpoint());
 		OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 0a4b5bf..0ac607f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -272,7 +272,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			// backends that lazily serializes (such as memory state backend) will fail here
-			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 		} catch (ExpectedKryoTestException e) {
 			numExceptions++;
 		} catch (Exception e) {
@@ -332,7 +332,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			// backends that lazily serializes (such as memory state backend) will fail here
-			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 		} catch (ExpectedKryoTestException e) {
 			numExceptions++;
 		} catch (Exception e) {
@@ -386,7 +386,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			// backends that lazily serializes (such as memory state backend) will fail here
-			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 		} catch (ExpectedKryoTestException e) {
 			numExceptions++;
 		} catch (Exception e) {
@@ -442,7 +442,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		try {
 			// backends that lazily serializes (such as memory state backend) will fail here
-			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 		} catch (ExpectedKryoTestException e) {
 			numExceptions++;
 		} catch (Exception e) {
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				682375462378L,
 				2,
 				streamFactory,
-				CheckpointOptions.forFullCheckpoint()));
+				CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 
@@ -559,7 +559,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				682375462378L,
 				2,
 				streamFactory,
-				CheckpointOptions.forFullCheckpoint()));
+				CheckpointOptions.forCheckpoint()));
 
 			snapshot.registerSharedStates(sharedStateRegistry);
 			backend.dispose();
@@ -585,7 +585,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				682375462378L,
 				2,
 				streamFactory,
-				CheckpointOptions.forFullCheckpoint()));
+				CheckpointOptions.forCheckpoint()));
 
 			snapshot2.registerSharedStates(sharedStateRegistry);
 			snapshot.discardState();
@@ -661,7 +661,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				682375462378L,
 				2,
 				streamFactory,
-				CheckpointOptions.forFullCheckpoint()));
+				CheckpointOptions.forCheckpoint()));
 
 			snapshot.registerSharedStates(sharedStateRegistry);
 			backend.dispose();
@@ -686,7 +686,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				682375462378L,
 				2,
 				streamFactory,
-				CheckpointOptions.forFullCheckpoint()));
+				CheckpointOptions.forCheckpoint()));
 
 			snapshot2.registerSharedStates(sharedStateRegistry);
 
@@ -752,7 +752,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			682375462378L,
 			2,
 			streamFactory,
-			CheckpointOptions.forFullCheckpoint()));
+			CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 
@@ -782,7 +782,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			682375462378L,
 			2,
 			streamFactory,
-			CheckpointOptions.forFullCheckpoint()));
+			CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 	}
@@ -819,7 +819,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			682375462378L,
 			2,
 			streamFactory,
-			CheckpointOptions.forFullCheckpoint()));
+			CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 
@@ -849,7 +849,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			682375462378L,
 			2,
 			streamFactory,
-			CheckpointOptions.forFullCheckpoint()));
+			CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 	}
@@ -886,7 +886,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -897,7 +897,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("u3");
 
 		// draw another snapshot
-		KeyedStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -1096,7 +1096,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals(13, (int) state2.value());
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 		backend = restoreKeyedBackend(
@@ -1168,7 +1168,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals(42L, (long) state.value());
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
@@ -1213,7 +1213,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -1224,7 +1224,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.add("u3");
 
 		// draw another snapshot
-		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -1477,7 +1477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -1488,7 +1488,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.add("u3");
 
 		// draw another snapshot
-		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -2093,7 +2093,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -2105,7 +2105,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.add(103);
 
 		// draw another snapshot
-		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -2192,7 +2192,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// make some more modifications
 		backend.setCurrentKey(1);
@@ -2204,7 +2204,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
 
 		// draw another snapshot
-		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		// validate the original state
 		backend.setCurrentKey(1);
@@ -2511,7 +2511,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("ShouldBeInSecondHalf");
 
 
-		KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		List<KeyedStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles(
 				Collections.singletonList(snapshot),
@@ -2578,7 +2578,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		state.update("2");
 
 		// draw a snapshot
-		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 
@@ -2609,7 +2609,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.update("2");
 
 			// draw a snapshot
-			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -2652,7 +2652,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("2");
 
 			// draw a snapshot
-			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -2697,7 +2697,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.add("2");
 
 			// draw a snapshot
-			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -2740,7 +2740,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			state.put("2", "Second");
 
 			// draw a snapshot
-			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpoint()));
 
 			backend.dispose();
 			// restore the first snapshot and validate it
@@ -2999,7 +2999,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
-		KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpoint()));
 
 		backend.dispose();
 
@@ -3031,7 +3031,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			// draw a snapshot
 			KeyedStateHandle snapshot =
-					FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint()));
+					FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forCheckpoint()));
 			assertNull(snapshot);
 			backend.dispose();
 
@@ -3120,7 +3120,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 			RunnableFuture<KeyedStateHandle> snapshot1 =
-				backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+				backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
 
 			Thread runner1 = new Thread(snapshot1, "snapshot-1-runner");
 			runner1.start();
@@ -3138,7 +3138,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			streamFactory.setBlockerLatch(null);
 
 			RunnableFuture<KeyedStateHandle> snapshot2 =
-				backend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forFullCheckpoint());
+				backend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpoint());
 
 			Thread runner2 = new Thread(snapshot2,"snapshot-2-runner");
 			runner2.start();
@@ -3177,7 +3177,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 			RunnableFuture<KeyedStateHandle> snapshot =
-					backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+					backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
 			Thread runner = new Thread(snapshot);
 			runner.start();
 			for (int i = 0; i < 20; ++i) {
@@ -3261,7 +3261,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 			RunnableFuture<KeyedStateHandle> snapshot =
-					backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+					backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
 
 			Thread runner = new Thread(snapshot);
 			runner.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 1aa6f63..168ed97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -135,7 +135,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			state.update("45");
 			CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024);
 			RunnableFuture<KeyedStateHandle> snapshot =
-				stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint());
+				stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpoint());
 			snapshot.run();
 			stateHandle = snapshot.get();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 3754d63..9c3ebb1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -158,7 +158,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
 //					0L,
 //					0L,
 //					new MemCheckpointStreamFactory(4 * 1024 * 1024),
-//					CheckpointOptions.forFullCheckpoint());
+//					CheckpointOptions.forCheckpoint());
 //
 //			snapshot.run();
 //

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 82dd99e..d925e4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -94,7 +94,7 @@ public class TaskAsyncCallTest {
 			awaitLatch.await();
 			
 			for (int i = 1; i <= NUM_CALLS; i++) {
-				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
+				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint());
 			}
 			
 			triggerLatch.await();
@@ -124,7 +124,7 @@ public class TaskAsyncCallTest {
 			awaitLatch.await();
 
 			for (int i = 1; i <= NUM_CALLS; i++) {
-				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forFullCheckpoint());
+				task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint());
 				task.notifyCheckpointComplete(i);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 1105e4e..58022a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -453,7 +453,7 @@ public abstract class AbstractStreamOperator<OUT>
 	/**
 	 * Returns a checkpoint stream factory for the provided options.
 	 *
-	 * <p>For {@link CheckpointType#FULL_CHECKPOINT} this returns the shared
+	 * <p>For {@link CheckpointType#CHECKPOINT} this returns the shared
 	 * factory of this operator.
 	 *
 	 * <p>For {@link CheckpointType#SAVEPOINT} it creates a custom factory per
@@ -466,7 +466,7 @@ public abstract class AbstractStreamOperator<OUT>
 	@VisibleForTesting
 	CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException {
 		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
-		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+		if (checkpointType == CheckpointType.CHECKPOINT) {
 			return checkpointStreamFactory;
 		} else if (checkpointType == CheckpointType.SAVEPOINT) {
 			return container.createSavepointStreamFactory(this, checkpointOptions.getTargetLocation());

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index a36e015..408c843 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -63,7 +63,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 					// TODO - we need to see how to derive those. We should probably not encode this in the
 					// TODO -   source's trigger message, but do a handshake in this task between the trigger
 					// TODO -   message from the master, and the source's trigger notification
-					final CheckpointOptions checkpointOptions = CheckpointOptions.forFullCheckpoint();
+					final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpoint();
 					final long timestamp = System.currentTimeMillis();
 
 					final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 798c81f..847ff13 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -501,7 +501,7 @@ public class AbstractStreamOperatorTest {
 		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
-		operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+		operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
 
 		verify(context).close();
 	}
@@ -534,7 +534,7 @@ public class AbstractStreamOperatorTest {
 		doThrow(failingException).when(operator).snapshotState(eq(context));
 
 		try {
-			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
@@ -589,14 +589,14 @@ public class AbstractStreamOperatorTest {
 		when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
 		AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
-		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forFullCheckpoint()))).thenThrow(failingException);
+		when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory), eq(CheckpointOptions.forCheckpoint()))).thenThrow(failingException);
 
 		Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
 		Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
 		Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
 
 		try {
-			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forFullCheckpoint());
+			operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpoint());
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 4ed689d..f16ea2d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -243,7 +243,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 							runStarted.await();
 							if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
 									new CheckpointMetaData(0, System.currentTimeMillis()),
-									CheckpointOptions.forFullCheckpoint())) {
+									CheckpointOptions.forCheckpoint())) {
 								LifecycleTrackingStreamSource.runFinish.trigger();
 							}
 						} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 831140a..993bffb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -528,7 +528,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-		task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+		task.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 
 		env.getCheckpointLatch().await();
 
@@ -565,7 +565,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		restoredTaskHarness.processElement(new StreamRecord<>(7, initialTime + 7));
 
 		// trigger the checkpoint while processing stream elements
-		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forFullCheckpoint());
+		restoredTask.triggerCheckpoint(new CheckpointMetaData(checkpointId, checkpointTimestamp), CheckpointOptions.forCheckpoint());
 
 		restoredTaskHarness.processElement(new StreamRecord<>(8, initialTime + 8));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index b5ea866..40303ce 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -287,7 +287,7 @@ public class BarrierBufferAlignmentLimitTest {
 	}
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forCheckpoint()), channel);
 	}
 
 	private static void check(BufferOrEvent expected, BufferOrEvent present) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 49d07b1..c801d09 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -153,7 +153,7 @@ public class BarrierBufferMassiveRandomTest {
 
 			if (barrierGens[currentChannel].isNextBarrier()) {
 				return new BufferOrEvent(
-						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()),
+						new CheckpointBarrier(++currentBarriers[currentChannel], System.currentTimeMillis(), CheckpointOptions.forCheckpoint()),
 							currentChannel);
 			} else {
 				Buffer buffer = bufferPools[currentChannel].requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 491b23d..baac79c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -1391,7 +1391,7 @@ public class BarrierBufferTest {
 	// ------------------------------------------------------------------------
 
 	private static BufferOrEvent createBarrier(long checkpointId, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpoint()), channel);
 	}
 
 	private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index cde9010..9251ab5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -471,7 +471,7 @@ public class BarrierTrackerTest {
 	// ------------------------------------------------------------------------
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forFullCheckpoint()), channel);
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forCheckpoint()), channel);
 	}
 
 	private static BufferOrEvent createCancellationBarrier(long id, int channel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
index a87e440..81b3130 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/BlockingCheckpointsTest.java
@@ -302,7 +302,7 @@ public class BlockingCheckpointsTest {
 
 		@Override
 		protected void run() throws Exception {
-			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forFullCheckpoint(), new CheckpointMetrics());
+			triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forCheckpoint(), new CheckpointMetrics());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index d60966a..1fdd922 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -355,7 +355,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -374,14 +374,14 @@ public class OneInputStreamTaskTest extends TestLogger {
 		// we should not yet see the barrier, only the two elements from non-blocked input
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 
 		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
+		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 
@@ -415,7 +415,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -436,24 +436,24 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		// Now give a later barrier to all inputs, this should unblock the first channel,
 		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 1, 1);
 
 		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
+		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()));
 
 		testHarness.waitForInputProcessing();
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
 		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 
@@ -502,7 +502,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointTimestamp);
 
-		while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint())) {}
+		while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint())) {}
 
 		// since no state was set, there shouldn't be restore calls
 		assertEquals(0, TestingStreamOperator.numberRestoreCalls);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
index 4824097..d54bba8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -270,7 +270,7 @@ public class RestoreStreamTaskTest extends TestLogger {
 		long checkpointId = 1L;
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 1L);
 
-		while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint())) {}
+		while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint())) {}
 
 		environment.getCheckpointLatch().await();
 		assertEquals(checkpointId, environment.getCheckpointId());

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
index b3b0a9f..67512b0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.java
@@ -72,7 +72,7 @@ public class SourceExternalCheckpointTriggerTest {
 		ready.await();
 
 		// now send an external trigger that should be ignored
-		assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(32, 829), CheckpointOptions.forFullCheckpoint()));
+		assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(32, 829), CheckpointOptions.forCheckpoint()));
 
 		// step by step let the source thread emit elements
 		sync.trigger();
@@ -88,7 +88,7 @@ public class SourceExternalCheckpointTriggerTest {
 		verifyNextElement(testHarness.getOutput(), 4L);
 
 		// now send an regular trigger command that should be ignored
-		assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(34, 900), CheckpointOptions.forFullCheckpoint()));
+		assertTrue(sourceTask.triggerCheckpoint(new CheckpointMetaData(34, 900), CheckpointOptions.forCheckpoint()));
 
 		sync.trigger();
 		verifyNextElement(testHarness.getOutput(), 5L);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 8867632..f434933 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -237,7 +237,7 @@ public class SourceStreamTaskTest {
 			for (int i = 0; i < numCheckpoints; i++) {
 				long currentCheckpointId = checkpointId.getAndIncrement();
 				CheckpointMetaData checkpointMetaData = new CheckpointMetaData(currentCheckpointId, 0L);
-				sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+				sourceTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 				Thread.sleep(checkpointInterval);
 			}
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 36bdc05..0a563ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -63,7 +63,7 @@ public class StreamTaskCancellationBarrierTest {
 
 		// tell the task to commence a checkpoint
 		boolean result = task.triggerCheckpoint(new CheckpointMetaData(41L, System.currentTimeMillis()),
-			CheckpointOptions.forFullCheckpoint());
+			CheckpointOptions.forCheckpoint());
 		assertFalse("task triggered checkpoint though not ready", result);
 
 		// a cancellation barrier should be downstream

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index b07b735..4c73e72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -178,7 +178,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 		RUN_LATCH.await();
 
 		// trigger a checkpoint
-		task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, CheckpointOptions.forFullCheckpoint());
+		task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, CheckpointOptions.forCheckpoint());
 
 		// wait until the task has completed execution
 		taskRun.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 525b351..4b1eb3f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -345,7 +345,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
 		try {
-			streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+			streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 			fail("Expected test exception here.");
 		} catch (Exception e) {
 			assertEquals(testException, e.getCause());
@@ -412,7 +412,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 
-		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 
 		verify(streamTask).handleAsyncException(anyString(), any(Throwable.class));
 
@@ -498,7 +498,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
-		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 
 		acknowledgeCheckpointLatch.await();
 
@@ -625,7 +625,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
-		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 
 		createSubtask.await();
 
@@ -714,7 +714,7 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool());
 
-		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint());
+		streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpoint());
 		checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
 		streamTask.cancel();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index d785c0d..d2e4dec 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -227,7 +227,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 0);
 
 		// This element should be buffered since we received a checkpoint barrier on
 		// this input
@@ -264,16 +264,16 @@ public class TwoInputStreamTaskTest {
 			expectedOutput,
 			testHarness.getOutput());
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 		testHarness.endInput();
 		testHarness.waitForTaskCompletion();
 
 		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()));
+		expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.",
@@ -308,7 +308,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke();
 		testHarness.waitForTaskRunning();
 
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 0);
 
 		// These elements should be buffered until we receive barriers from
 		// all inputs
@@ -331,15 +331,15 @@ public class TwoInputStreamTaskTest {
 
 		// Now give a later barrier to all inputs, this should unblock the first channel,
 		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()), 1, 1);
 
 		expectedOutput.add(new CancelCheckpointMarker(0));
 		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forFullCheckpoint()));
+		expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()));
 
 		testHarness.waitForInputProcessing();
 
@@ -348,9 +348,9 @@ public class TwoInputStreamTaskTest {
 				testHarness.getOutput());
 
 		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpoint()), 1, 1);
 
 		testHarness.waitForInputProcessing();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 3d1b6fd..13f0b3f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -459,7 +459,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		OperatorSnapshotResult operatorStateResult = operator.snapshotState(
 			checkpointId,
 			timestamp,
-			CheckpointOptions.forFullCheckpoint());
+			CheckpointOptions.forCheckpoint());
 
 		KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
 		KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());

http://git-wip-us.apache.org/repos/asf/flink/blob/fe3b2768/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
index f196c50..6ba5ca4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
@@ -221,7 +221,7 @@ public class ExternalizedCheckpointITCase extends TestLogger {
 				NotifyingInfiniteTupleSource.countDownLatch.await();
 
 				externalCheckpoint =
-					cluster.requestCheckpoint(submissionResult.getJobID(), CheckpointOptions.forFullCheckpoint());
+					cluster.requestCheckpoint(submissionResult.getJobID(), CheckpointOptions.forCheckpoint());
 
 				cluster.cancelJob(submissionResult.getJobID());
 			}