You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:30 UTC

[03/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index e5a1c23..c503a1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -48,11 +47,6 @@ import static org.junit.Assert.assertTrue;
  * this barriers are correctly forwarded.
  *
  * <p>
- * This uses a mixture of Operators with the {@link Checkpointed} interface and the new
- * {@link org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext#getOperatorState}
- * method.
- *
- * <p>
  * The test triggers a failure after a while and verifies that, after completion, the
  * state reflects the "exactly once" semantics.
  */
@@ -142,25 +136,21 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements  ParallelSourceFunction<String> {
+			implements ParallelSourceFunction<String>, Checkpointed<Integer> {
 
+		static final long[] counts = new long[PARALLELISM];
+		
 		private final long numElements;
 
 		private Random rnd;
 		private StringBuilder stringBuilder;
 
-		private OperatorState<Integer> index;
+		private int index;
 		private int step;
 
-		private volatile boolean isRunning;
-
-		static final long[] counts = new long[PARALLELISM];
-		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
-		}
-
+		private volatile boolean isRunning = true;
 
+		
 		StringGeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
 		}
@@ -169,20 +159,19 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		public void open(Configuration parameters) throws IOException {
 			rnd = new Random();
 			stringBuilder = new StringBuilder();
+			
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-
-			index = getRuntimeContext().getOperatorState("index", getRuntimeContext().getIndexOfThisSubtask(), false);
-
-			isRunning = true;
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
 		}
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
-			while (isRunning && index.value() < numElements) {
-				char first = (char) ((index.value() % 40) + 40);
+			while (isRunning && index < numElements) {
+				char first = (char) ((index % 40) + 40);
 
 				stringBuilder.setLength(0);
 				stringBuilder.append(first);
@@ -190,13 +179,18 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 				String result = randomString(stringBuilder, rnd);
 
 				synchronized (lockingObject) {
-					index.update(index.value() + step);
+					index += step;
 					ctx.collect(result);
 				}
 			}
 		}
 
 		@Override
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
+		}
+		
+		@Override
 		public void cancel() {
 			isRunning = false;
 		}
@@ -211,29 +205,46 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			return bld.toString();
 		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
 	}
 
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
+			implements Checkpointed<Long> {
 
-		private OperatorState<Long> count;
 		static final long[] counts = new long[PARALLELISM];
+		
+		private long count;
+		
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
-			count.update(count.value() + 1);
+			count++;
 			return value;
 		}
 
 		@Override
-		public void open(Configuration conf) throws IOException {
-			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 
 		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
 		}
 
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
 	}
 
 	private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 108e1e6..0fcedda 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -83,20 +83,21 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 	// Custom Functions
 	// --------------------------------------------------------------------------------------------
 
-	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> {
+	private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> 
+		implements Checkpointed<Integer> {
 
 		private final long numElements;
 
-		private OperatorState<Integer> index;
+		private int index;
 		private int step;
 
-		private volatile boolean isRunning;
+		private volatile boolean isRunning = true;
 
 		static final long[] counts = new long[PARALLELISM];
 
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
 		}
 
 		IntGeneratingSourceFunction(long numElements) {
@@ -106,22 +107,18 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		@Override
 		public void open(Configuration parameters) throws IOException {
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-			index = getRuntimeContext().getOperatorState("index",
-					getRuntimeContext().getIndexOfThisSubtask(), false);
-
-			isRunning = true;
+			index = getRuntimeContext().getIndexOfThisSubtask();
 		}
 
 		@Override
 		public void run(SourceContext<Integer> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
-			while (isRunning && index.value() < numElements) {
+			while (isRunning && index < numElements) {
 
 				synchronized (lockingObject) {
-					index.update(index.value() + step);
-					ctx.collect(index.value() % 40);
+					index += step;
+					ctx.collect(index % 40);
 				}
 			}
 		}
@@ -130,11 +127,22 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		public void cancel() {
 			isRunning = false;
 		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
 	}
 
 	private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> {
 
 		private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>();
+		
 		private static volatile boolean hasFailed = false;
 
 		private final long numElements;
@@ -157,7 +165,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
-			sum = getRuntimeContext().getOperatorState("sum", 0L, true);
+			sum = getRuntimeContext().getKeyValueState(Long.class, 0L);
 		}
 
 		@Override
@@ -183,20 +191,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 		@Override
 		public void open(Configuration parameters) throws IOException {
-			counts = getRuntimeContext().getOperatorState("count", NonSerializableLong.of(0L), true,
-					new StateCheckpointer<NonSerializableLong, String>() {
-
-						@Override
-						public String snapshotState(NonSerializableLong state, long id, long ts) {
-							return state.value.toString();
-						}
-
-						@Override
-						public NonSerializableLong restoreState(String stateSnapshot) {
-							return NonSerializableLong.of(Long.parseLong(stateSnapshot));
-						}
-
-					});
+			counts = getRuntimeContext().getKeyValueState(NonSerializableLong.class, NonSerializableLong.of(0L));
 		}
 
 		@Override
@@ -204,7 +199,6 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 			long currentCount = counts.value().value + 1;
 			counts.update(NonSerializableLong.of(currentCount));
 			allCounts.put(value.f0, currentCount);
-
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 8a75de5..f517f83 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -265,28 +265,33 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 		}
 	}
 	
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
+		implements Checkpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
 		
-		private OperatorState<Long> count;
+		private long count;
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
-			count.update(count.value() + 1);
+			count++;
 			return value;
 		}
 
 		@Override
-		public void open(Configuration conf) throws IOException {
-			count = getRuntimeContext().getOperatorState("count", 0L, false);
+		public void close() throws IOException {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 
 		@Override
-		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
 		}
-		
 	}
 	
 	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 270cfaa..08af93a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -101,14 +100,16 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 
 	@Override
 	public void postSubmit() {
-		List[][] checkList = new List[][]{	GeneratingSourceFunction.completedCheckpoints,
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		List<Long>[][] checkList = new List[][] {
+				GeneratingSourceFunction.completedCheckpoints,
 				IdentityMapFunction.completedCheckpoints,
 				LongRichFilterFunction.completedCheckpoints,
 				LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
 
 		long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
 
-		for(List[] parallelNotifications : checkList) {
+		for(List<Long>[] parallelNotifications : checkList) {
 			for (int i = 0; i < PARALLELISM; i++){
 				List<Long> notifications = parallelNotifications[i];
 				assertTrue("No checkpoint notification was received.",
@@ -134,21 +135,23 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 	 * interface it stores all the checkpoint ids it has seen in a static list.
 	 */
 	private static class GeneratingSourceFunction extends RichSourceFunction<Long>
-			implements  ParallelSourceFunction<Long>, CheckpointNotifier {
+			implements ParallelSourceFunction<Long>, CheckpointNotifier, Checkpointed<Integer> {
 
-		// operator life cycle
-		private volatile boolean isRunning;
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		static List<Long>[] completedCheckpoints = new List[PARALLELISM];
+		
 
 		// operator behaviour
 		private final long numElements;
 		private long result;
 
-		private OperatorState<Integer> index;
+		private int index;
 		private int step;
 
 		// test behaviour
 		private int subtaskId;
-		public static List[] completedCheckpoints = new List[PARALLELISM];
+
+		private volatile boolean isRunning = true;
 
 		GeneratingSourceFunction(long numElements) {
 			this.numElements = numElements;
@@ -158,26 +161,27 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		public void open(Configuration parameters) throws IOException {
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
 			subtaskId = getRuntimeContext().getIndexOfThisSubtask();
-			index = getRuntimeContext().getOperatorState("index", subtaskId, false);
+			
+			if (index == 0) {
+				index = subtaskId;
+			}
 
 			// Create a collection on the first open
 			if (completedCheckpoints[subtaskId] == null) {
-				completedCheckpoints[subtaskId] = new ArrayList();
+				completedCheckpoints[subtaskId] = new ArrayList<>();
 			}
-
-			isRunning = true;
 		}
 
 		@Override
 		public void run(SourceContext<Long> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
-			while (isRunning && index.value() < numElements) {
+			while (isRunning && index < numElements) {
 
-				result = index.value() % 10;
+				result = index % 10;
 
 				synchronized (lockingObject) {
-					index.update(index.value() + step);
+					index += step;
 					ctx.collect(result);
 				}
 			}
@@ -192,6 +196,16 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {
 			completedCheckpoints[subtaskId].add(checkpointId);
 		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
 	}
 
 	/**
@@ -201,7 +215,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 	private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>>
 			implements CheckpointNotifier {
 
-		public static List[] completedCheckpoints = new List[PARALLELISM];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		public static List<Long>[] completedCheckpoints = new List[PARALLELISM];
+		
 		private int subtaskId;
 
 		@Override
@@ -215,7 +231,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 
 			// Create a collection on the first open
 			if (completedCheckpoints[subtaskId] == null) {
-				completedCheckpoints[subtaskId] = new ArrayList();
+				completedCheckpoints[subtaskId] = new ArrayList<>();
 			}
 		}
 
@@ -283,7 +299,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 	private static class LongRichFilterFunction extends RichFilterFunction<Long>
 			implements CheckpointNotifier {
 
-		public static List[] completedCheckpoints = new List[PARALLELISM];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		static List<Long>[] completedCheckpoints = new List[PARALLELISM];
+		
 		private int subtaskId;
 
 		@Override
@@ -297,7 +315,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 
 			// Create a collection on the first open
 			if (completedCheckpoints[subtaskId] == null) {
-				completedCheckpoints[subtaskId] = new ArrayList();
+				completedCheckpoints[subtaskId] = new ArrayList<>();
 			}
 		}
 
@@ -315,7 +333,8 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 	private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long>
 			implements CheckpointNotifier {
 
-		public static List[] completedCheckpoints = new List[PARALLELISM];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		public static List<Long>[] completedCheckpoints = new List[PARALLELISM];
 		private int subtaskId;
 
 		@Override
@@ -324,7 +343,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
 
 			// Create a collection on the first open
 			if (completedCheckpoints[subtaskId] == null) {
-				completedCheckpoints[subtaskId] = new ArrayList();
+				completedCheckpoints[subtaskId] = new ArrayList<>();
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 0804d53..992a679 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -36,7 +35,6 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -62,14 +60,15 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		stream
 				// -------------- first vertex, chained to the source ----------------
-				.filter(new StringRichFilterFunction()).shuffle()
+				.filter(new StringRichFilterFunction())
+				.shuffle()
 
 				// -------------- seconds vertex - the stateful one that also fails ----------------
 				.map(new StringPrefixCountRichMapFunction())
 				.startNewChain()
 				.map(new StatefulCounterFunction())
 
-						// -------------- third vertex - counter and the sink ----------------
+				// -------------- third vertex - counter and the sink ----------------
 				.keyBy("prefix")
 				.map(new OnceFailingPrefixCounter(NUM_STRINGS))
 				.addSink(new SinkFunction<PrefixCount>() {
@@ -118,22 +117,23 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements  ParallelSourceFunction<String> {
+			implements ParallelSourceFunction<String>, Checkpointed<Integer> {
 
 		private final long numElements;
 		
-		private Random rnd;
-		private StringBuilder stringBuilder;
+		private final Random rnd = new Random();
+		private final StringBuilder stringBuilder = new StringBuilder();
 
-		private OperatorState<Integer> index;
+		private int index;
 		private int step;
 
-		private volatile boolean isRunning;
+		private volatile boolean isRunning = true;
 
 		static final long[] counts = new long[PARALLELISM];
+		
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
 		}
 
 
@@ -143,22 +143,18 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 		@Override
 		public void open(Configuration parameters) throws IOException {
-			rnd = new Random();
-			stringBuilder = new StringBuilder();
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
-			
-			
-			index = getRuntimeContext().getOperatorState("index", getRuntimeContext().getIndexOfThisSubtask(), false);
-			
-			isRunning = true;
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
 		}
 
 		@Override
 		public void run(SourceContext<String> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
-			while (isRunning && index.value() < numElements) {
-				char first = (char) ((index.value() % 40) + 40);
+			while (isRunning && index < numElements) {
+				char first = (char) ((index % 40) + 40);
 
 				stringBuilder.setLength(0);
 				stringBuilder.append(first);
@@ -166,7 +162,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 				String result = randomString(stringBuilder, rnd);
 
 				synchronized (lockingObject) {
-					index.update(index.value() + step);
+					index += step;
 					ctx.collect(result);
 				}
 			}
@@ -187,6 +183,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			return bld.toString();
 		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+			return index;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			index = state;
+		}
 	}
 	
 	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
@@ -215,8 +221,12 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			count = state;
 		}
 	}
-	
-	private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> {
+
+	/**
+	 * This function uses simultaneously the key/value state and is checkpointed.
+	 */
+	private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> 
+			implements Checkpointed<Long> {
 		
 		private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
 		static final long[] counts = new long[PARALLELISM];
@@ -229,7 +239,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		private long count;
 		
 		private OperatorState<Long> pCount;
-		private OperatorState<Long> inputCount;
+		private long inputCount;
 
 		OnceFailingPrefixCounter(long numElements) {
 			this.numElements = numElements;
@@ -242,13 +252,12 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
-			pCount = getRuntimeContext().getOperatorState("prefix-count", 0L, true);
-			inputCount = getRuntimeContext().getOperatorState("input-count", 0L, false);
+			pCount = getRuntimeContext().getKeyValueState(Long.class, 0L);
 		}
 		
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = inputCount.value();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = inputCount;
 		}
 
 		@Override
@@ -256,9 +265,9 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			count++;
 			if (!hasFailed && count >= failurePos) {
 				hasFailed = true;
-				throw new Exception("Test Failure");
+//				throw new Exception("Test Failure");
 			}
-			inputCount.update(inputCount.value() + 1);
+			inputCount++;
 		
 			long currentPrefixCount = pCount.value() + value.count;
 			pCount.update(currentPrefixCount);
@@ -266,12 +275,23 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 			value.count = currentPrefixCount;
 			return value;
 		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return inputCount;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			inputCount = state;
+		}
 	}
 
 	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
-
-		Long count = 0L;
+		
 		static final long[] counts = new long[PARALLELISM];
+
+		private long count;
 		
 		@Override
 		public boolean filter(String value) {
@@ -285,7 +305,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 		}
 
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
 			return count;
 		}
 
@@ -296,49 +316,31 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 	}
 
 	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
-			implements Checkpointed<Integer> {
-
-		OperatorState<Long> count;
+			implements Checkpointed<Long> {
+		
 		static final long[] counts = new long[PARALLELISM];
+
+		private long count;
 		
 		@Override
 		public PrefixCount map(String value) throws IOException {
-			count.update(count.value() + 1);
+			count++;
 			return new PrefixCount(value.substring(0, 1), value, 1L);
 		}
-		
-		@Override
-		public void open(Configuration conf) throws IOException {
-			this.count = getRuntimeContext().getOperatorState("count", 0L, false,
-					new StateCheckpointer<Long, String>() {
-
-						@Override
-						public String snapshotState(Long state, long id, long ts) {
-							return state.toString();
-						}
-
-						@Override
-						public Long restoreState(String stateSnapshot) {
-							return Long.parseLong(stateSnapshot);
-						}
-
-					});
-		}
 
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 
 		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return null;
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
 		}
 
 		@Override
-		public void restoreState(Integer state) {
-			// verify that we never store/restore null state
-			fail();
+		public void restoreState(Long state) {
+			count = state;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index cb02d2f..c12bcb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -22,10 +22,10 @@ import com.google.common.collect.EvictingQueue;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -142,32 +142,36 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 	 * Produces a sequence multiple times for each parallelism instance of downstream operators,
 	 * augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
 	 */
-	private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>{
+	private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>
+			implements Checkpointed<Long> {
 
-		private transient OperatorState<Long> count;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			count = getRuntimeContext().getOperatorState("count", 0L, false);
-		}
+		private long count;
 
 		@Override
 		public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
 			Object lock = ctx.getCheckpointLock();
 
-			while (count.value() < NUM_INPUT){
+			while (count < NUM_INPUT){
 				synchronized (lock){
 					for (int i = 0; i < PARALLELISM; i++) {
-						ctx.collect(Tuple2.of(i, count.value() + 1));
+						ctx.collect(Tuple2.of(i, count + 1));
 					}
-					count.update(count.value() + 1);
+					count++;
 				}
 			}
 		}
 
 		@Override
-		public void cancel() {
+		public void cancel() {}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
 		}
 	}
 
@@ -175,14 +179,15 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 	 * Mapper that causes one failure between seeing 40% to 70% of the records.
 	 */
 	private static class OnceFailingIdentityMapFunction
-			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+			extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> 
+			implements Checkpointed<Long> {
 
 		private static volatile boolean hasFailed = false;
 
 		private final long numElements;
 
 		private long failurePos;
-		private OperatorState<Long> count;
+		private long count;
 
 		public OnceFailingIdentityMapFunction(long numElements) {
 			this.numElements = numElements;
@@ -194,19 +199,28 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
 			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-			count = getRuntimeContext().getOperatorState("count", 0L, false);
 		}
 
 		@Override
 		public Tuple2<Integer, Long> map(Tuple2<Integer, Long> value) throws Exception {
-			if (!hasFailed && count.value() >= failurePos) {
+			if (!hasFailed && count >= failurePos) {
 				hasFailed = true;
 				throw new Exception("Test Failure");
 			}
-			count.update(count.value() + 1);
+			count++;
 			return value;
 		}
 
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index dab6a6d..989db14 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -23,6 +23,7 @@ import java.io.File;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Rule;
@@ -55,7 +56,8 @@ public class ClassLoaderITCase {
 
 			// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
 			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-			config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, "file://" + folder.newFolder().getAbsolutePath());
+			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
+					folder.newFolder().getAbsoluteFile().toURI().toString());
 
 			ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index e7b1668..054b321 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.test.recovery;
 
-
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
@@ -29,17 +26,19 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+
 import org.junit.Assert;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for streaming program behaviour in case of TaskManager failure
  * based on {@link AbstractProcessFailureRecoveryTest}.
@@ -72,7 +71,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		env.getConfig().disableSysoutLogging();
 		env.setNumberOfExecutionRetries(1);
 		env.enableCheckpointing(200);
-		env.setStateHandleProvider(FileStateHandle.createProvider(tempCheckpointDir.getAbsolutePath()));
+		
+		env.setStateBackend(new FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI()));
 
 		DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
 				// add a non-chained no-op map to test the chain state restore logic
@@ -104,7 +104,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 	}
 
-	public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
+	public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> 
+			implements Checkpointed<Long> {
 
 		private static final long SLEEP_TIME = 50;
 
@@ -113,7 +114,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 
 		private volatile boolean isRunning = true;
 		
-		private OperatorState<Long> collected;
+		private long collected;
 
 		public SleepyDurableGenerateSequence(File coordinateDir, long end) {
 			this.coordinateDir = coordinateDir;
@@ -133,7 +134,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
 			boolean checkForProceedFile = true;
 
-			while (isRunning && collected.value() < toCollect) {
+			while (isRunning && collected < toCollect) {
 				// check if the proceed file exists (then we go full speed)
 				// if not, we always recheck and sleep
 				if (checkForProceedFile) {
@@ -146,21 +147,26 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 				}
 
 				synchronized (checkpointLock) {
-					sourceCtx.collect(collected.value() * stepSize + congruence);
-					collected.update(collected.value() + 1);
+					sourceCtx.collect(collected * stepSize + congruence);
+					collected++;
 				}
 			}
 		}
-		
-		@Override
-		public void open(Configuration conf) throws IOException {
-			collected = getRuntimeContext().getOperatorState("count", 0L, false);
-		}
 
 		@Override
 		public void cancel() {
 			isRunning = false;
 		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+			return collected;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			collected = state;
+		}
 	}
 	
 	public static class Mapper extends RichMapFunction<Long, Long> {