You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:09 UTC

[13/18] flink git commit: [FLINK-5113] Port functions in tests to new CheckpointedFunction IF.

[FLINK-5113] Port functions in tests to new CheckpointedFunction IF.

This closes #2939.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/525edf1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/525edf1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/525edf1e

Branch: refs/heads/master
Commit: 525edf1e6925b55302d991ddf537a2f16caba21d
Parents: 570dbc8
Author: kl0u <kk...@gmail.com>
Authored: Tue Nov 22 19:23:33 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 23:57:22 2017 +0100

----------------------------------------------------------------------
 .../CassandraTupleWriteAheadSinkExample.java    |  22 ++-
 .../fs/RollingSinkFaultToleranceITCase.java     |  18 ++-
 .../BucketingSinkFaultToleranceITCase.java      |  18 ++-
 .../connectors/kafka/KafkaConsumerTestBase.java |  22 ++-
 .../kafka/testutils/FailingIdentityMapper.java  |  18 ++-
 .../testutils/ValidatingExactlyOnceSink.java    |  26 ++--
 .../org/apache/flink/util/CollectionUtil.java   |   2 +-
 .../streaming/api/checkpoint/Checkpointed.java  |   3 +-
 .../runtime/tasks/SourceStreamTaskTest.java     |  12 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  43 ++++--
 .../CoStreamCheckpointingITCase.java            |  74 ++++++----
 .../EventTimeAllWindowCheckpointingITCase.java  |  38 +++--
 .../PartitionedStateCheckpointingITCase.java    |  17 ++-
 .../test/checkpointing/RescalingITCase.java     |  27 ++--
 .../test/checkpointing/SavepointITCase.java     |  14 +-
 .../checkpointing/StateCheckpointedITCase.java  |  94 +++++++-----
 .../StreamCheckpointNotifierITCase.java         |  31 ++--
 .../StreamCheckpointingITCase.java              |  79 ++++++----
 .../UdfStreamOperatorCheckpointingITCase.java   |  14 +-
 .../WindowCheckpointingITCase.java              |  33 +++--
 .../jar/CheckpointedStreamingProgram.java       |  28 ++--
 .../jar/CheckpointingCustomKvStateProgram.java  |  21 +--
 .../jar/LegacyCheckpointedStreamingProgram.java | 148 +++++++++++++++++++
 .../flink/test/recovery/ChaosMonkeyITCase.java  |  38 +++--
 .../JobManagerHACheckpointRecoveryITCase.java   |  41 +++--
 ...erProcessFailureStreamingRecoveryITCase.java |  30 ++--
 26 files changed, 608 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
index 811c410..23de949 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
@@ -21,12 +21,14 @@ import com.datastax.driver.core.Cluster;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -50,6 +52,9 @@ public class CassandraTupleWriteAheadSinkExample {
 			.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
 			.enableWriteAheadLog()
 			.setClusterBuilder(new ClusterBuilder() {
+
+				private static final long serialVersionUID = 2793938419775311824L;
+
 				@Override
 				public Cluster buildCluster(Cluster.Builder builder) {
 					return builder.addContactPoint("127.0.0.1").build();
@@ -62,7 +67,9 @@ public class CassandraTupleWriteAheadSinkExample {
 		env.execute();
 	}
 
-	public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
+	public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
+		private static final long serialVersionUID = 4022367939215095610L;
+
 		private int counter = 0;
 		private boolean stop = false;
 
@@ -84,13 +91,16 @@ public class CassandraTupleWriteAheadSinkExample {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return counter;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.counter);
 		}
 
 		@Override
-		public void restoreState(Integer state) throws Exception {
-			this.counter = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.counter = state.get(0);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 36c0d03..2d8492f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -42,7 +42,9 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -236,7 +238,7 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 	}
 
 	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements CheckpointedAsynchronously<Integer> {
+			implements ListCheckpointed<Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -246,7 +248,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 
 		private volatile boolean isRunning = true;
 
-
 		StringGeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
 		}
@@ -288,13 +289,16 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index 54703a3..85f23b6 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import com.google.common.collect.Sets;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -42,7 +42,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.BufferedReader;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -233,7 +235,7 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
 	}
 
 	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements CheckpointedAsynchronously<Integer> {
+			implements ListCheckpointed<Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -243,7 +245,6 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
 
 		private volatile boolean isRunning = true;
 
-
 		StringGeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
 		}
@@ -285,13 +286,16 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index aa7ea49..d7fab88 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -51,21 +51,16 @@ import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -73,7 +68,6 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -1925,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 
 	public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
-			implements Checkpointed<Integer>, CheckpointListener {
+			implements ListCheckpointed<Integer>, CheckpointListener {
 
 		private static final long serialVersionUID = 6334389850158707313L;
 
@@ -1939,7 +1933,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		private boolean failer;
 		private boolean hasBeenCheckpointed;
 
-
 		public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
 			this.shutdownBrokerId = shutdownBrokerId;
 			this.failCount = failCount;
@@ -1994,13 +1987,16 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsTotal;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.numElementsTotal);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			this.numElementsTotal = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.numElementsTotal = state.get(0);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index 2bd400c..ec64b00 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -21,13 +21,16 @@ package org.apache.flink.streaming.connectors.kafka.testutils;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
+
 
 public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
-		Checkpointed<Integer>, CheckpointListener, Runnable {
+	ListCheckpointed<Integer>, CheckpointListener, Runnable {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
 	
@@ -89,13 +92,16 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
 	}
 
 	@Override
-	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-		return numElementsTotal;
+	public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+		return Collections.singletonList(numElementsTotal);
 	}
 
 	@Override
-	public void restoreState(Integer state) {
-		numElementsTotal = state;
+	public void restoreState(List<Integer> state) throws Exception {
+		if (state.isEmpty() || state.size() > 1) {
+			throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		}
+		this.numElementsTotal = state.get(0);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 7813561..46e70fd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -19,15 +19,17 @@
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.test.util.SuccessException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
 
-public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> {
+public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
 
@@ -39,7 +41,6 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme
 
 	private int numElements; // this is checkpointed
 
-	
 	public ValidatingExactlyOnceSink(int numElementsTotal) {
 		this.numElementsTotal = numElementsTotal;
 	}
@@ -68,15 +69,20 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme
 	}
 
 	@Override
-	public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
-		LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
-		return new Tuple2<>(numElements, duplicateChecker);
+	public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception {
+		LOG.info("Snapshot of counter " + numElements + " at checkpoint " + checkpointId);
+		return Collections.singletonList(new Tuple2<>(numElements, duplicateChecker));
 	}
 
 	@Override
-	public void restoreState(Tuple2<Integer, BitSet> state) {
-		LOG.info("restoring num elements to {}", state.f0);
-		this.numElements = state.f0;
-		this.duplicateChecker = state.f1;
+	public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception {
+		if (state.isEmpty() || state.size() > 1) {
+			throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+		}
+
+		Tuple2<Integer, BitSet> s = state.get(0);
+		LOG.info("restoring num elements to {}", s.f0);
+		this.numElements = s.f0;
+		this.duplicateChecker = s.f1;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
index 15d00ae..cd5c2e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
@@ -34,4 +34,4 @@ public final class CollectionUtil {
 	public static boolean isNullOrEmpty(Map<?, ?> map) {
 		return map == null || map.isEmpty();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index 7af5cea..fb67ea7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -31,8 +31,7 @@ import java.io.Serializable;
  * state is written, the function is not called, so the function needs not return a
  * copy of its state, but may return a reference to its state. Functions that can
  * continue to work and mutate the state, even while the state snapshot is being accessed,
- * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
- * interface.</p>
+ * can implement the {@link CheckpointedAsynchronously} interface.</p>
  * 
  * @param <T> The type of the operator state.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index b592fe8..dd1fe58 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -34,6 +34,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -135,7 +136,7 @@ public class SourceStreamTaskTest {
 		}
 	}
 
-	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
+	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
 		private int maxElements;
@@ -181,7 +182,7 @@ public class SourceStreamTaskTest {
 		}
 
 		@Override
-		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public List<Serializable> snapshotState(long checkpointId, long timestamp) throws Exception {
 			if (!semaphore.tryAcquire()) {
 				Assert.fail("Concurrent invocation of snapshotState.");
 			}
@@ -199,11 +200,12 @@ public class SourceStreamTaskTest {
 				Assert.fail("Count is different at start end end of snapshot.");
 			}
 			semaphore.release();
-			return sum;
+			return Collections.<Serializable>singletonList(sum);
 		}
 
 		@Override
-		public void restoreState(Serializable state) {}
+		public void restoreState(List<Serializable> state) throws Exception {
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 583e42f..1911f44 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -53,7 +53,9 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -495,7 +497,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	// ------------------------------------------------------------------------
 
 	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements Checkpointed<Integer>, CheckpointListener
+			implements ListCheckpointed<Integer>, CheckpointListener
 	{
 		private static volatile boolean failedBefore = false;
 
@@ -567,13 +569,16 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsEmitted;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.numElementsEmitted);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			numElementsEmitted = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.numElementsEmitted = state.get(0);
 		}
 
 		public static void reset() {
@@ -582,7 +587,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 	}
 
 	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements Checkpointed<HashMap<Long, Integer>> {
+			implements ListCheckpointed<HashMap<Long, Integer>> {
 
 		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
 
@@ -676,19 +681,22 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		}
 
 		@Override
-		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return this.windowCounts;
+		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.windowCounts);
 		}
 
 		@Override
-		public void restoreState(HashMap<Long, Integer> state) {
-			this.windowCounts.putAll(state);
+		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			windowCounts.putAll(state.get(0));
 		}
 	}
 
 	// Sink for validating the stateful window counts
 	private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements Checkpointed<HashMap<Long, Integer>> {
+			implements ListCheckpointed<HashMap<Long, Integer>> {
 
 		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
 
@@ -757,13 +765,16 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		}
 
 		@Override
-		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return this.windowCounts;
+		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.windowCounts);
 		}
 
 		@Override
-		public void restoreState(HashMap<Long, Integer> state) {
-			this.windowCounts.putAll(state);
+		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.windowCounts.putAll(state.get(0));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 51a00b9..06d3ab0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -38,6 +38,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -142,11 +144,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 	/**
 	 * A generating source that is slow before the first two checkpoints went through
 	 * and will indefinitely stall at a certain point to allow the checkpoint to complete.
-	 * 
+	 *
 	 * After the checkpoints are through, it continues with full speed.
 	 */
 	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements Checkpointed<Integer>, CheckpointListener {
+			implements ListCheckpointed<Integer>, CheckpointListener {
 
 		private static volatile int numCompletedCheckpoints = 0;
 
@@ -210,13 +212,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 
 		@Override
@@ -239,12 +244,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 	}
 
 	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
 		
 		private long count;
-		
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
@@ -258,13 +262,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 
@@ -303,7 +310,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+	private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
 
@@ -321,18 +328,21 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 
-	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements ListCheckpointed<Long> {
+		
 		static final long[] counts = new long[PARALLELISM];
 
 		private long count;
@@ -344,13 +354,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 
 		@Override
@@ -359,7 +372,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 	}
 
-	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
+	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements ListCheckpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
 
@@ -378,13 +391,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index b493e42..09c1437 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -43,14 +43,16 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.*;
 
 /**
- * This verfies that checkpointing works correctly with event time windows.
+ * This verifies that checkpointing works correctly with event time windows.
  *
  * <p>
  * This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
@@ -432,7 +434,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements Checkpointed<Integer>, CheckpointListener
+			implements ListCheckpointed<Integer>, CheckpointListener
 	{
 		private static volatile boolean failedBefore = false;
 
@@ -502,23 +504,26 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			numSuccessfulCheckpoints++;
 		}
 
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsEmitted;
+		public static void reset() {
+			failedBefore = false;
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			numElementsEmitted = state;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.numElementsEmitted);
 		}
 
-		public static void reset() {
-			failedBefore = false;
+		@Override
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.numElementsEmitted = state.get(0);
 		}
 	}
 
 	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements Checkpointed<HashMap<Long, Integer>> {
+			implements ListCheckpointed<HashMap<Long, Integer>> {
 
 		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
 
@@ -612,13 +617,16 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		}
 
 		@Override
-		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return this.windowCounts;
+		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.windowCounts);
 		}
 
 		@Override
-		public void restoreState(HashMap<Long, Integer> state) {
-			this.windowCounts.putAll(state);
+		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.windowCounts.putAll(state.get(0));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 0728b41..4761d70 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
@@ -33,7 +35,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -86,7 +88,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 	// --------------------------------------------------------------------------------------------
 
 	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> 
-		implements Checkpointed<Integer> {
+		implements ListCheckpointed<Integer> {
 
 		private final long numElements;
 
@@ -133,13 +135,16 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index bd1678e..8045d82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -66,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -763,7 +763,7 @@ public class RescalingITCase extends TestLogger {
 		}
 	}
 
-	private static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements Checkpointed<Integer> {
+	private static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements ListCheckpointed<Integer> {
 
 		private static final long serialVersionUID = 8388073059042040203L;
 
@@ -772,13 +772,16 @@ public class RescalingITCase extends TestLogger {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return counter;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.counter);
 		}
 
 		@Override
-		public void restoreState(Integer state) throws Exception {
-			counter = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.counter = state.get(0);
 		}
 	}
 
@@ -879,18 +882,20 @@ public class RescalingITCase extends TestLogger {
 		}
 	}
 
-	private static class NonPartitionedStateSource extends StateSourceBase implements Checkpointed<Integer> {
+	private static class NonPartitionedStateSource extends StateSourceBase implements ListCheckpointed<Integer> {
 
 		private static final long serialVersionUID = -8108185918123186841L;
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return counter;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.counter);
 		}
 
 		@Override
-		public void restoreState(Integer state) throws Exception {
-			counter = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (!state.isEmpty()) {
+				this.counter = state.get(0);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 9f957e5..77777d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -63,7 +63,6 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -527,7 +526,7 @@ public class SavepointITCase extends TestLogger {
 
 	private static class StatefulCounter
 		extends RichMapFunction<Integer, Integer>
-		implements Checkpointed<byte[]>, CheckpointListener {
+		implements ListCheckpointed<byte[]>, CheckpointListener {
 
 		private static final Object checkpointLock = new Object();
 		private static int numCompleteCalls;
@@ -556,13 +555,16 @@ public class SavepointITCase extends TestLogger {
 		}
 
 		@Override
-		public byte[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return data;
+		public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(data);
 		}
 
 		@Override
-		public void restoreState(byte[] data) throws Exception {
-			this.data = data;
+		public void restoreState(List<byte[]> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.data = state.get(0);
 
 			synchronized (checkpointLock) {
 				if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 9d37b59..32d9e23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -34,9 +33,10 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -47,7 +47,7 @@ import static org.junit.Assert.assertTrue;
  * A simple test that runs a streaming topology with checkpointing enabled.
  *
  * The test triggers a failure after a while and verifies that, after completion, the
- * state defined with either the {@link ValueState} or the {@link Checkpointed}
+ * state defined with either the {@link ValueState} or the {@link ListCheckpointed}
  * interface reflects the "exactly once" semantics.
  * 
  * The test throttles the input until at least two checkpoints are completed, to make sure that
@@ -139,15 +139,14 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> 
-			implements CheckpointedAsynchronously<Integer>
+			implements ListCheckpointed<Integer>
 	{
 		private final long numElements;
 
 		private int index;
 
 		private volatile boolean isRunning = true;
-		
-		
+
 		StringGeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
 		}
@@ -197,23 +196,26 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 	}
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> 
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
 		
 		private long count;
-		
+
 		@Override
 		public boolean filter(String value) throws Exception {
 			count++;
@@ -226,23 +228,26 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> 
-			implements CheckpointedAsynchronously<Long> {
+			implements ListCheckpointed<Long> {
 		
 		static final long[] counts = new long[PARALLELISM];
 
 		private long count;
-		
+
 		@Override
 		public PrefixCount map(String value) {
 			count++;
@@ -255,18 +260,21 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 	
 	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
-		implements Checkpointed<Long> {
+		implements ListCheckpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
 		
@@ -284,18 +292,21 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 	
 	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
-		implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointListener {
+		implements ListCheckpointed<HashMap<String, PrefixCount>>, CheckpointListener {
 
 		static boolean wasCheckpointedBeforeFailure = false;
 		
@@ -307,7 +318,6 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		private long count;
 		
 		private boolean wasCheckpointed;
-		
 
 		OnceFailingAggregator(long failurePos) {
 			this.failurePos = failurePos;
@@ -339,13 +349,16 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return aggregationMap;
+		public List<HashMap<String, PrefixCount>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.aggregationMap);
 		}
 
 		@Override
-		public void restoreState(HashMap<String, PrefixCount> state) {
-			aggregationMap.putAll(state);
+		public void restoreState(List<HashMap<String, PrefixCount>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.aggregationMap.putAll(state.get(0));
 		}
 
 		@Override
@@ -355,7 +368,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 	}
 
 	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 
-			implements Checkpointed<HashMap<Character, Long>> {
+			implements ListCheckpointed<HashMap<Character, Long>> {
 
 		@SuppressWarnings("unchecked")
 		private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
@@ -379,13 +392,16 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return counts;
+		public List<HashMap<Character, Long>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.counts);
 		}
 
 		@Override
-		public void restoreState(HashMap<Character, Long> state) {
-			counts.putAll(state);
+		public void restoreState(List<HashMap<Character, Long>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.counts.putAll(state.get(0));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 6bf511f..be3fac5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -205,7 +206,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 	 * interface it stores all the checkpoint ids it has seen in a static list.
 	 */
 	private static class GeneratingSourceFunction extends RichSourceFunction<Long>
-			implements ParallelSourceFunction<Long>, CheckpointListener, Checkpointed<Integer> {
+			implements ParallelSourceFunction<Long>, CheckpointListener, ListCheckpointed<Integer> {
 		
 		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
 		
@@ -263,13 +264,16 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 
 		@Override
@@ -390,7 +394,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 	 * Reducer that causes one failure between seeing 40% to 70% of the records.
 	 */
 	private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>> 
-		implements Checkpointed<Long>, CheckpointListener
+		implements ListCheckpointed<Long>, CheckpointListener
 	{
 		static volatile boolean hasFailed = false;
 		static volatile long failureCheckpointID;
@@ -402,7 +406,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 		private volatile long count;
 
 		private volatile boolean notificationAlready;
-		
+
 		OnceFailingReducer(long numElements) {
 			this.failurePos = (long) (0.5 * numElements / PARALLELISM);
 		}
@@ -419,19 +423,22 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
 			if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) {
 				LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception <<<<<<<<<<<<<<<<<<<<<");
 				hasFailed = true;
 				failureCheckpointID = checkpointId;
 				throw new Exception("Test Failure");
 			}
-			return count;
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index b97e1f2..aae04c9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +43,7 @@ import static org.junit.Assert.assertEquals;
  * A simple test that runs a streaming topology with checkpointing enabled.
  * 
  * The test triggers a failure after a while and verifies that, after completion, the
- * state defined with the {@link Checkpointed} interface reflects the "exactly once" semantics.
+ * state defined with the {@link ListCheckpointed} interface reflects the "exactly once" semantics.
  */
 @SuppressWarnings("serial")
 public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@@ -118,7 +120,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements ParallelSourceFunction<String>, Checkpointed<Integer> {
+			implements ParallelSourceFunction<String>, ListCheckpointed<Integer> {
 
 		private final long numElements;
 		
@@ -131,7 +133,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		private volatile boolean isRunning = true;
 
 		static final long[] counts = new long[PARALLELISM];
-		
+
 		@Override
 		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
@@ -186,17 +188,20 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.index);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			index = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.index = state.get(0);
 		}
 	}
 	
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements ListCheckpointed<Long> {
 
 		private long count;
 		static final long[] counts = new long[PARALLELISM];
@@ -213,13 +218,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 
@@ -227,7 +235,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	 * This function uses simultaneously the key/value state and is checkpointed.
 	 */
 	private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> 
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 		
 		private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
 		static final long[] counts = new long[PARALLELISM];
@@ -238,7 +246,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		
 		private long failurePos;
 		private long count;
-		
+
 		private ValueState<Long> pCount;
 		private long inputCount;
 
@@ -279,22 +287,25 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return inputCount;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.inputCount);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			inputCount = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.inputCount = state.get(0);
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+	private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
 		
 		static final long[] counts = new long[PARALLELISM];
 
 		private long count;
-		
+
 		@Override
 		public boolean filter(String value) {
 			count++;
@@ -307,23 +318,26 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 		
 		static final long[] counts = new long[PARALLELISM];
 
 		private long count;
-		
+
 		@Override
 		public PrefixCount map(String value) throws IOException {
 			count++;
@@ -336,13 +350,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 5874f56..dae5cd9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -146,7 +145,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 	 * augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
 	 */
 	private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>
-			implements Checkpointed<Long> {
+			implements ListCheckpointed<Long> {
 
 		private long count;
 
@@ -168,13 +167,16 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 		public void cancel() {}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.count);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void restoreState(List<Long> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.count = state.get(0);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index e424a8d..a45349d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -46,7 +46,9 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -310,7 +312,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements Checkpointed<Integer>, CheckpointListener
+			implements ListCheckpointed<Integer>, CheckpointListener
 	{
 		private static volatile boolean failedBefore = false;
 
@@ -373,13 +375,16 @@ public class WindowCheckpointingITCase extends TestLogger {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return numElementsEmitted;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.numElementsEmitted);
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			numElementsEmitted = state;
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.numElementsEmitted = state.get(0);
 		}
 
 		public static void reset() {
@@ -388,7 +393,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 	}
 
 	private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>>
-			implements Checkpointed<HashMap<Long, Integer>> {
+			implements ListCheckpointed<HashMap<Long, Integer>> {
 
 		private final HashMap<Long, Integer> counts = new HashMap<>();
 
@@ -439,18 +444,20 @@ public class WindowCheckpointingITCase extends TestLogger {
 		}
 
 		@Override
-		public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
-			return this.counts;
+		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.counts);
 		}
 
 		@Override
-		public void restoreState(HashMap<Long, Integer> state) {
-			this.counts.putAll(state);
+		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.counts.putAll(state.get(0));
 
-			for (Integer i : state.values()) {
+			for (Integer i : state.get(0).values()) {
 				this.aggCount += i;
 			}
-
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index f91582f..52a3ba8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -21,13 +21,15 @@ package org.apache.flink.test.classloading.jar;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.lang.RuntimeException;
+import java.util.Collections;
+import java.util.List;
 
 /**
  * A simple streaming program, which is using the state checkpointing of Flink.
@@ -57,7 +59,7 @@ public class CheckpointedStreamingProgram {
 
 
 	// with Checkpoining
-	public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
+	public static class SimpleStringGenerator implements SourceFunction<String>, ListCheckpointed<Integer> {
 		public boolean running = true;
 
 		@Override
@@ -74,32 +76,36 @@ public class CheckpointedStreamingProgram {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return null;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.emptyList();
 		}
 
 		@Override
-		public void restoreState(Integer state) {
+		public void restoreState(List<Integer> state) throws Exception {
 
 		}
 	}
 
-	public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
+	public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener {
 
 		private String someState;
 		private boolean atLeastOneSnapshotComplete = false;
 		private boolean restored = false;
 
 		@Override
-		public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return this;
+		public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this);
 		}
 
 		@Override
-		public void restoreState(StatefulMapper state) {
+		public void restoreState(List<StatefulMapper> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
 			restored = true;
-			this.someState = state.someState;
-			this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
+			StatefulMapper s = state.get(0);
+			this.someState = s.someState;
+			this.atLeastOneSnapshotComplete = s.atLeastOneSnapshotComplete;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index 6796cb0..d3baa7d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -42,6 +42,8 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class CheckpointingCustomKvStateProgram {
@@ -84,7 +86,7 @@ public class CheckpointingCustomKvStateProgram {
 		env.execute();
 	}
 
-	private static class InfiniteIntegerSource implements ParallelSourceFunction<Integer>, Checkpointed<Integer> {
+	private static class InfiniteIntegerSource implements ParallelSourceFunction<Integer>, ListCheckpointed<Integer> {
 		private static final long serialVersionUID = -7517574288730066280L;
 		private volatile boolean running = true;
 
@@ -104,17 +106,18 @@ public class CheckpointingCustomKvStateProgram {
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return 0;
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(0);
 		}
 
 		@Override
-		public void restoreState(Integer state) throws Exception {
+		public void restoreState(List<Integer> state) throws Exception {
 
 		}
 	}
 
-	private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer> implements Checkpointed<ReducingStateFlatMap>, CheckpointListener {
+	private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>
+			implements ListCheckpointed<ReducingStateFlatMap>, CheckpointListener {
 
 		private static final long serialVersionUID = -5939722892793950253L;
 		private transient ReducingState<Integer> kvState;
@@ -148,12 +151,12 @@ public class CheckpointingCustomKvStateProgram {
 		}
 
 		@Override
-		public ReducingStateFlatMap snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return this;
+		public List<ReducingStateFlatMap> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this);
 		}
 
 		@Override
-		public void restoreState(ReducingStateFlatMap state) throws Exception {
+		public void restoreState(List<ReducingStateFlatMap> state) throws Exception {
 			restored = true;
 			atLeastOneSnapshotComplete = true;
 		}