You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:37 UTC

[10/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

[FLINK-2808] [streaming] Refactor and extend state backend abstraction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/479bec0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/479bec0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/479bec0b

Branch: refs/heads/master
Commit: 479bec0b2b44315196c1f1cddeb114c79d1717db
Parents: 5ac2872
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 5 15:57:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200

----------------------------------------------------------------------
 .../flink/storm/wrappers/BoltWrapper.java       |  12 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |   2 +-
 .../storm/wrappers/WrapperSetupHelper.java      |   2 +-
 .../storm/api/FlinkTopologyBuilderTest.java     |   6 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   | 161 ++++---
 .../flink/storm/wrappers/SpoutWrapperTest.java  |  11 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  |  12 +-
 .../common/functions/AbstractRichFunction.java  |   5 +-
 .../functions/IterationRuntimeContext.java      |   2 +-
 .../api/common/functions/RichFunction.java      |  28 +-
 .../api/common/functions/RuntimeContext.java    | 163 ++++---
 .../util/AbstractRuntimeUDFContext.java         |  17 +-
 .../common/functions/util/ListCollector.java    |   5 +
 .../flink/api/common/state/OperatorState.java   |  23 +-
 .../api/common/state/StateCheckpointer.java     |  73 ---
 .../flink/configuration/ConfigConstants.java    |   5 -
 .../memory/InputViewDataInputStreamWrapper.java |   5 +-
 .../apache/flink/util/InstantiationUtil.java    |   7 +-
 .../org/apache/flink/util/SerializedValue.java  |   8 +
 .../flink/core/testutils/CommonTestUtils.java   |  21 +-
 flink-dist/src/main/resources/flink-conf.yaml   |  15 +-
 .../flink/runtime/state/FileStateHandle.java    |  31 --
 .../flink/runtime/state/LocalStateHandle.java   |  15 +-
 .../runtime/state/StateHandleProvider.java      |  39 --
 .../apache/flink/runtime/taskmanager/Task.java  |   2 -
 flink-staging/flink-fs-tests/pom.xml            |  19 +
 .../flink/hdfstests/FileStateBackendTest.java   | 308 +++++++++++++
 .../flink/hdfstests/FileStateHandleTest.java    | 126 ------
 .../kafka/testutils/MockRuntimeContext.java     |  37 +-
 .../BroadcastOutputSelectorWrapper.java         |  12 +-
 .../selector/DirectedOutputSelectorWrapper.java |  33 +-
 .../selector/OutputSelectorWrapper.java         |   2 +-
 .../streaming/api/datastream/DataStream.java    |  12 +-
 .../api/datastream/DataStreamSink.java          |   4 +-
 .../streaming/api/datastream/KeyedStream.java   |  38 +-
 .../datastream/SingleOutputStreamOperator.java  |   7 +-
 .../environment/StreamExecutionEnvironment.java |  64 +--
 .../api/functions/sink/FileSinkFunction.java    |  10 +-
 .../api/functions/sink/PrintSinkFunction.java   |   2 +-
 .../functions/source/FileSourceFunction.java    |   5 +-
 .../source/StatefulSequenceSource.java          |  36 +-
 .../flink/streaming/api/graph/StreamConfig.java | 149 +++----
 .../flink/streaming/api/graph/StreamGraph.java  |  21 +-
 .../api/graph/StreamGraphGenerator.java         |  28 +-
 .../flink/streaming/api/graph/StreamNode.java   |   9 +
 .../api/graph/StreamingJobGraphGenerator.java   |  19 +-
 .../api/operators/AbstractStreamOperator.java   | 296 ++++++++++--
 .../operators/AbstractUdfStreamOperator.java    | 163 ++++---
 .../api/operators/ChainingStrategy.java         |  47 ++
 .../api/operators/OneInputStreamOperator.java   |   4 +-
 .../flink/streaming/api/operators/Output.java   |   2 +-
 .../api/operators/StatefulStreamOperator.java   |  40 --
 .../streaming/api/operators/StreamFlatMap.java  |   5 +-
 .../api/operators/StreamGroupedFold.java        |  47 +-
 .../api/operators/StreamGroupedReduce.java      |  54 +--
 .../streaming/api/operators/StreamOperator.java |  92 ++--
 .../streaming/api/operators/StreamProject.java  |   5 +-
 .../streaming/api/operators/StreamSource.java   |   3 +-
 .../api/operators/StreamingRuntimeContext.java  | 162 +++++++
 .../api/operators/co/CoStreamFlatMap.java       |   5 +-
 .../api/state/AbstractHeapKvState.java          | 145 ++++++
 .../streaming/api/state/BasicCheckpointer.java  |  37 --
 .../streaming/api/state/EagerStateStore.java    | 104 -----
 .../streaming/api/state/KVMapCheckpointer.java  |  82 ----
 .../flink/streaming/api/state/KvState.java      |  69 +++
 .../streaming/api/state/KvStateSnapshot.java    |  69 +++
 .../api/state/OperatorStateHandle.java          |  54 ---
 .../api/state/PartitionedStateStore.java        |  55 ---
 .../state/PartitionedStreamOperatorState.java   | 182 --------
 .../flink/streaming/api/state/StateBackend.java | 135 ++++++
 .../api/state/StateBackendFactory.java          |  40 ++
 .../api/state/StreamOperatorState.java          | 132 ------
 .../streaming/api/state/StreamStateHandle.java  |  28 ++
 .../streaming/api/state/WrapperStateHandle.java |  61 ---
 .../api/state/filesystem/AbstractFileState.java |  83 ++++
 .../filesystem/FileSerializableStateHandle.java |  53 +++
 .../state/filesystem/FileStreamStateHandle.java |  46 ++
 .../api/state/filesystem/FsHeapKvState.java     |  88 ++++
 .../state/filesystem/FsHeapKvStateSnapshot.java |  95 ++++
 .../api/state/filesystem/FsStateBackend.java    | 409 +++++++++++++++++
 .../state/filesystem/FsStateBackendFactory.java |  56 +++
 .../api/state/memory/ByteStreamStateHandle.java |  52 +++
 .../api/state/memory/MemHeapKvState.java        |  52 +++
 .../state/memory/MemoryHeapKvStateSnapshot.java | 102 +++++
 .../api/state/memory/MemoryStateBackend.java    | 206 +++++++++
 .../api/state/memory/SerializedStateHandle.java |  49 ++
 .../CoFeedbackTransformation.java               |   4 +-
 .../transformations/FeedbackTransformation.java |   4 +-
 .../transformations/OneInputTransformation.java |  18 +-
 .../PartitionTransformation.java                |   6 +-
 .../transformations/SelectTransformation.java   |   9 +-
 .../api/transformations/SinkTransformation.java |  15 +-
 .../transformations/SourceTransformation.java   |   4 +-
 .../transformations/SplitTransformation.java    |   4 +-
 .../transformations/StreamTransformation.java   |   5 +-
 .../transformations/TwoInputTransformation.java |   4 +-
 .../transformations/UnionTransformation.java    |   4 +-
 .../streaming/runtime/io/CollectorWrapper.java  |  18 +-
 .../runtime/io/StreamInputProcessor.java        |   9 +-
 .../operators/BucketStreamSortOperator.java     |  18 +-
 .../operators/ExtractTimestampsOperator.java    |  14 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   9 +-
 .../windowing/NonKeyedWindowOperator.java       |  28 +-
 .../operators/windowing/WindowOperator.java     |  29 +-
 .../ExceptionInChainedOperatorException.java    |  45 ++
 .../runtime/tasks/OneInputStreamTask.java       |  14 +-
 .../streaming/runtime/tasks/OperatorChain.java  | 308 +++++++++++++
 .../streaming/runtime/tasks/OutputHandler.java  | 336 --------------
 .../runtime/tasks/SourceStreamTask.java         |  12 +-
 .../runtime/tasks/StreamIterationHead.java      |   8 +-
 .../runtime/tasks/StreamIterationTail.java      |   6 +-
 .../streaming/runtime/tasks/StreamTask.java     | 447 ++++++++++---------
 .../runtime/tasks/StreamTaskState.java          | 108 +++++
 .../runtime/tasks/StreamTaskStateList.java      |  60 +++
 .../runtime/tasks/StreamingRuntimeContext.java  | 204 ---------
 .../runtime/tasks/TwoInputStreamTask.java       |  15 +-
 .../streaming/api/AggregationFunctionTest.java  |  31 +-
 .../flink/streaming/api/DataStreamTest.java     |   5 +-
 .../api/functions/PrintSinkFunctionTest.java    |   2 +-
 .../api/graph/StreamGraphGeneratorTest.java     |  15 +-
 .../api/operators/StreamGroupedFoldTest.java    |  28 +-
 .../api/operators/StreamGroupedReduceTest.java  |  17 +-
 .../api/state/FileStateBackendTest.java         | 419 +++++++++++++++++
 .../api/state/MemoryStateBackendTest.java       | 278 ++++++++++++
 .../streaming/api/state/StateHandleTest.java    | 135 ------
 .../api/state/StatefulOperatorTest.java         | 377 ----------------
 ...AlignedProcessingTimeWindowOperatorTest.java | 209 +++++----
 ...AlignedProcessingTimeWindowOperatorTest.java | 201 +++++----
 .../runtime/tasks/StreamTaskTestHarness.java    |  13 +-
 .../runtime/tasks/StreamTaskTimerITCase.java    |  17 +-
 .../streaming/timestamp/TimestampITCase.java    |   4 +-
 .../flink/streaming/util/MockContext.java       |  74 ++-
 .../util/OneInputStreamOperatorTestHarness.java |  71 +--
 .../streaming/util/SourceFunctionUtil.java      |  19 +-
 .../util/TwoInputStreamOperatorTestHarness.java |  69 +--
 .../flink/streaming/api/scala/DataStream.scala  |  88 +---
 .../flink/streaming/api/scala/KeyedStream.scala | 106 ++++-
 .../api/scala/StreamExecutionEnvironment.scala  |  39 +-
 .../api/scala/function/StatefulFunction.scala   |  16 +-
 .../streaming/api/scala/DataStreamTest.scala    |  77 ++--
 .../streaming/api/scala/StateTestPrograms.scala |  23 +-
 .../CoStreamCheckpointingITCase.java            |  73 +--
 .../PartitionedStateCheckpointingITCase.java    |  52 +--
 .../checkpointing/StateCheckpoinedITCase.java   |  21 +-
 .../StreamCheckpointNotifierITCase.java         |  61 ++-
 .../StreamCheckpointingITCase.java              | 120 ++---
 .../UdfStreamOperatorCheckpointingITCase.java   |  50 ++-
 .../test/classloading/ClassLoaderITCase.java    |   4 +-
 .../ProcessFailureStreamingRecoveryITCase.java  |  38 +-
 149 files changed, 5747 insertions(+), 3790 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index b16fc09..f0913e8 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -62,11 +61,12 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	private final Fields inputSchema;
 	/** The original Storm topology. */
 	protected StormTopology stormTopology;
+	
 	/**
 	 *  We have to use this because Operators must output
 	 *  {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
 	 */
-	private TimestampedCollector<OUT> flinkCollector;
+	private transient TimestampedCollector<OUT> flinkCollector;
 
 	/**
 	 * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
@@ -206,8 +206,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	}
 
 	@Override
-	public void open(final Configuration parameters) throws Exception {
-		super.open(parameters);
+	public void open() throws Exception {
+		super.open();
 
 		this.flinkCollector = new TimestampedCollector<OUT>(output);
 		OutputCollector stormCollector = null;
@@ -217,7 +217,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 					this.numberOfAttributes, flinkCollector));
 		}
 
-		GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
+		GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
 		StormConfig stormConfig = new StormConfig();
 
 		if (config != null) {
@@ -229,7 +229,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 		}
 
 		final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(
-				super.runtimeContext, this.bolt, this.stormTopology, stormConfig);
+				getRuntimeContext(), this.bolt, this.stormTopology, stormConfig);
 
 		this.bolt.prepare(stormConfig, topologyContext, stormCollector);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 914a19d..e78dd5c 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.storm.util.FiniteSpout;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import com.google.common.collect.Sets;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
index d529b6a..5f1f142 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -29,7 +29,7 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.tuple.Fields;
 
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import clojure.lang.Atom;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
index e6fb8e5..906d081 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.flink.storm.api;
 
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;
+
+import org.junit.Ignore;
 import org.junit.Test;
 
 import backtype.storm.tuple.Fields;
@@ -52,6 +54,7 @@ public class FlinkTopologyBuilderTest {
 	}
 
 	@Test
+	@Ignore
 	public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
 		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
 
@@ -63,6 +66,7 @@ public class FlinkTopologyBuilderTest {
 	}
 
 	@Test
+	@Ignore
 	public void testFieldsGroupingOnMultipleBoltOutputStreams() {
 		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index e33fdb9..c1485c8 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -29,18 +29,18 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.storm.wrappers.BoltWrapper;
-import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
-import org.apache.flink.storm.wrappers.StormTuple;
-import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -139,7 +139,6 @@ public class BoltWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichBolt bolt = mock(IRichBolt.class);
@@ -149,8 +148,8 @@ public class BoltWrapperTest extends AbstractTest {
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
-		wrapper.setup(mock(Output.class), taskContext);
-		wrapper.open(null);
+		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.open();
 
 		wrapper.processElement(record);
 		if (numberOfAttributes == -1) {
@@ -169,11 +168,6 @@ public class BoltWrapperTest extends AbstractTest {
 		final StreamRecord record = mock(StreamRecord.class);
 		when(record.getValue()).thenReturn(2).thenReturn(3);
 
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
 		final Output output = mock(Output.class);
 
 		final TestBolt bolt = new TestBolt();
@@ -186,8 +180,8 @@ public class BoltWrapperTest extends AbstractTest {
 		}
 
 		final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null, raw);
-		wrapper.setup(output, taskContext);
-		wrapper.open(null);
+		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output);
+		wrapper.open();
 
 		final SplitStreamType splitRecord = new SplitStreamType<Integer>();
 		if (rawOutType1) {
@@ -214,86 +208,70 @@ public class BoltWrapperTest extends AbstractTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testOpen() throws Exception {
-		final StormConfig stormConfig = new StormConfig();
-		final Configuration flinkConfig = new Configuration();
-
-		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
-		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-				.thenReturn(flinkConfig);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
+		
+		// utility mocks
 		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+		
+		// (1) open with no configuration
+		{
+			ExecutionConfig execConfig = mock(ExecutionConfig.class);
+			when(execConfig.getGlobalJobParameters()).thenReturn(null);
+
+			final IRichBolt bolt = mock(IRichBolt.class);
+			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+
+			wrapper.open();
+			verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+		}
 
-		final IRichBolt bolt = mock(IRichBolt.class);
+		// (2) open with a storm specific configuration
+		{
+			final StormConfig stormConfig = new StormConfig();
 
-		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-		wrapper.setup(mock(Output.class), taskContext);
+			ExecutionConfig execConfig = mock(ExecutionConfig.class);
+			when(execConfig.getGlobalJobParameters()).thenReturn(stormConfig);
+
+			final IRichBolt bolt = mock(IRichBolt.class);
+			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
+			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+
+			wrapper.open();
+			verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class));
+		}
 
-		// test without configuration
-		wrapper.open(null);
-		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
+		// (3) open with a flink config
+		{
+			final Configuration cfg = new Configuration();
+			cfg.setString("foo", "bar");
+			cfg.setInteger("the end (the int)", Integer.MAX_VALUE);
 
-		// test with StormConfig
-		wrapper.open(null);
-		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
-				any(OutputCollector.class));
+			ExecutionConfig execConfig = mock(ExecutionConfig.class);
+			when(execConfig.getGlobalJobParameters()).thenReturn(new UnmodifiableConfiguration(cfg));
 
-		// test with Configuration
-		final TestDummyBolt testBolt = new TestDummyBolt();
-		wrapper = new BoltWrapper<Object, Object>(testBolt);
-		wrapper.setup(mock(Output.class), taskContext);
+			TestDummyBolt testBolt = new TestDummyBolt();
+			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(testBolt);
+			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
 
-		wrapper.open(null);
-		for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
-			Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+			wrapper.open();
+			for (Entry<String, String> entry : cfg.toMap().entrySet()) {
+				Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
+			}
 		}
 	}
 
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testOpenSink() throws Exception {
-		final StormConfig stormConfig = new StormConfig();
-		final Configuration flinkConfig = new Configuration();
-
-		final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
-		when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-				.thenReturn(flinkConfig);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
 		final IRichBolt bolt = mock(IRichBolt.class);
-
 		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-		wrapper.setup(mock(Output.class), taskContext);
-
-		// test without configuration
-		wrapper.open(null);
-		verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
-				isNull(OutputCollector.class));
-
-		// test with StormConfig
-		wrapper.open(null);
-		verify(bolt).prepare(same(stormConfig), any(TopologyContext.class),
-				isNull(OutputCollector.class));
-
-		// test with Configuration
-		final TestDummyBolt testBolt = new TestDummyBolt();
-		wrapper = new BoltWrapper<Object, Object>(testBolt);
-		wrapper.setup(mock(Output.class), taskContext);
-
-		wrapper.open(null);
-		for (Entry<String, String> entry : flinkConfig.toMap().entrySet()) {
-			Assert.assertEquals(entry.getValue(), testBolt.config.get(entry.getKey()));
-		}
+		
+		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.open();
+		
+		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
 	}
 
 	@SuppressWarnings("unchecked")
@@ -306,9 +284,8 @@ public class BoltWrapperTest extends AbstractTest {
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		wrapper.setup(mock(Output.class), taskContext);
+		
+		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
 
 		wrapper.close();
 		wrapper.dispose();
@@ -351,5 +328,25 @@ public class BoltWrapperTest extends AbstractTest {
 		}
 	}
 
-
+	public static StreamTask<?, ?> createMockStreamTask() {
+		return createMockStreamTask(new ExecutionConfig());
+	}
+	
+	public static StreamTask<?, ?> createMockStreamTask(ExecutionConfig execConfig) {
+		Environment env = mock(Environment.class);
+		when(env.getTaskName()).thenReturn("Mock Task");
+		when(env.getTaskNameWithSubtasks()).thenReturn("Mock Task (1/1)");
+		when(env.getIndexInSubtaskGroup()).thenReturn(0);
+		when(env.getNumberOfSubtasks()).thenReturn(1);
+		when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
+		
+		StreamTask<?, ?> mockTask = mock(StreamTask.class);
+		when(mockTask.getName()).thenReturn("Mock Task (1/1)");
+		when(mockTask.getCheckpointLock()).thenReturn(new Object());
+		when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
+		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getExecutionConfig()).thenReturn(execConfig);
+		
+		return mockTask;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
index 227d736..b81b775 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -30,11 +30,8 @@ import org.apache.flink.storm.util.FiniteSpout;
 import org.apache.flink.storm.util.FiniteTestSpout;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.storm.util.TestDummySpout;
-import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
-import org.apache.flink.storm.wrappers.SpoutWrapper;
-import org.apache.flink.storm.wrappers.WrapperSetupHelper;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -72,7 +69,6 @@ public class SpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = mock(IRichSpout.class);
@@ -112,7 +108,6 @@ public class SpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = mock(IRichSpout.class);
@@ -136,7 +131,6 @@ public class SpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final FiniteTestSpout spout = new FiniteTestSpout(numberOfCalls);
@@ -158,7 +152,6 @@ public class SpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
@@ -176,7 +169,6 @@ public class SpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final SpoutWrapper<?> wrapper = new SpoutWrapper<Object>(stormSpout);
@@ -192,7 +184,6 @@ public class SpoutWrapperTest extends AbstractTest {
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
 		when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
 		when(taskContext.getTaskName()).thenReturn("name");
 
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index c3b0300..20e480d 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -39,9 +39,7 @@ import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;
-import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
-import org.apache.flink.storm.wrappers.WrapperSetupHelper;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -181,9 +179,9 @@ public class WrapperSetupHelperTest extends AbstractTest {
 		builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
 		builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
 		builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
-				.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+				.shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
 				.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
-				.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+				.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
 				.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
 
 		int counter = 0;
@@ -207,9 +205,9 @@ public class WrapperSetupHelperTest extends AbstractTest {
 		flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
 		flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
 		flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink"))
-				.fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id"))
+				.shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId)
 				.shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
-				.fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id"))
+				.shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId)
 				.shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
 
 		flinkBuilder.createTopology();

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 5a019aa..fd9de67 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -38,10 +38,12 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
 	
 	private transient RuntimeContext runtimeContext;
 
+	@Override
 	public void setRuntimeContext(RuntimeContext t) {
 		this.runtimeContext = t;
 	}
-	
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		if (this.runtimeContext != null) {
 			return this.runtimeContext;
@@ -50,6 +52,7 @@ public abstract class AbstractRichFunction implements RichFunction, Serializable
 		}
 	}
 	
+	@Override
 	public IterationRuntimeContext getIterationRuntimeContext() {
 		if (this.runtimeContext == null) {
 			throw new IllegalStateException("The runtime context has not been initialized.");

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
index 73e738e..8239921 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/IterationRuntimeContext.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.types.Value;
 
 /**
- *
+ * 
  */
 public interface IterationRuntimeContext extends RuntimeContext {
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index 0685f63..0cbde4a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -36,7 +36,7 @@ public interface RichFunction extends Function {
 	 * The configuration contains all parameters that were configured on the function in the program
 	 * composition.
 	 * 
-	 * <pre><blockquote>
+	 * <pre>{@code
 	 * public class MyMapper extends FilterFunction<String> {
 	 * 
 	 *     private String searchString;
@@ -49,7 +49,7 @@ public interface RichFunction extends Function {
 	 *         return value.equals(searchString);
 	 *     }
 	 * }
-	 * </blockquote></pre>
+	 * }</pre>
 	 * <p>
 	 * By default, this method does nothing.
 	 * 
@@ -64,7 +64,7 @@ public interface RichFunction extends Function {
 	void open(Configuration parameters) throws Exception;
 
 	/**
-	 * Teardown method for the user code. It is called after the last call to the main working methods
+	 * Tear-down method for the user code. It is called after the last call to the main working methods
 	 * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an iteration, this method will
 	 * be invoked after each iteration superstep.
 	 * <p>
@@ -76,16 +76,32 @@ public interface RichFunction extends Function {
 	 */
 	void close() throws Exception;
 	
+	// ------------------------------------------------------------------------
+	//  Runtime context
+	// ------------------------------------------------------------------------
 	
 	/**
-	 * Gets the context that contains information about the UDF's runtime.
+	 * Gets the context that contains information about the UDF's runtime, such as the 
+	 * parallelism of the function, the subtask index of the function, or the name of
+	 * the of the task that executes the function.
 	 * 
-	 * Context information are for example {@link org.apache.flink.api.common.accumulators.Accumulator}s
-	 * or the {@link org.apache.flink.api.common.cache.DistributedCache}.
+	 * <p>The RuntimeContext also gives access to the
+	 * {@link org.apache.flink.api.common.accumulators.Accumulator}s and the
+	 * {@link org.apache.flink.api.common.cache.DistributedCache}.
 	 * 
 	 * @return The UDF's runtime context.
 	 */
 	RuntimeContext getRuntimeContext();
+
+	/**
+	 * Gets a specialized version of the {@link RuntimeContext}, which has additional information
+	 * about the iteration in which the function is executed. This IterationRuntimeContext is only
+	 * available if the function is part of an iteration. Otherwise, this method throws an exception.
+	 * 
+	 * @return The IterationRuntimeContext.
+	 * @throws java.lang.IllegalStateException Thrown, if the function is not executed as part of an iteration.
+	 */
+	IterationRuntimeContext getIterationRuntimeContext();
 	
 	/**
 	 * Sets the function's runtime context. Called by the framework when creating a parallel instance of the function.

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 289f063..cadef36 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +30,7 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
@@ -82,11 +81,7 @@ public interface RuntimeContext {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Add this accumulator. Throws an exception if the counter is already
-	 * existing.
-	 * 
-	 * This is only needed to support generic accumulators (e.g. for
-	 * Set<String>). Didn't find a way to get this work with getAccumulator.
+	 * Add this accumulator. Throws an exception if the accumulator already exists.
 	 */
 	<V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator);
 
@@ -169,65 +164,101 @@ public interface RuntimeContext {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Returns the {@link OperatorState} with the given name of the underlying
-	 * operator instance, which can be used to store and update user state in a
-	 * fault tolerant fashion. The state will be initialized by the provided
-	 * default value, and the {@link StateCheckpointer} will be used to draw the
-	 * state snapshots.
+	 * Gets the key/value state, which is only accessible if the function is executed on
+	 * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
+	 * return the value bound to the key of the element currently processed by the function.
+	 *
+	 * <p>Because the scope of each value is the key of the currently processed element,
+	 * and the elements are distributed by the Flink runtime, the system can transparently
+	 * scale out and redistribute the state and KeyedStream.
+	 *
+	 * <p>The following code example shows how to implement a continuous counter that counts
+	 * how many times elements of a certain key occur, and emits an updated count for that
+	 * element on each occurrence. 
+	 *
+	 * <pre>{@code
+	 * DataStream<MyType> stream = ...;
+	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");     
+	 *
+	 * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
+	 *
+	 *     private State<Long> state;
+	 *
+	 *     public void open(Configuration cfg) {
+	 *         state = getRuntimeContext().getKeyValueState(Long.class, 0L);
+	 *     }
+	 *
+	 *     public Tuple2<MyType, Long> map(MyType value) {
+	 *         long count = state.value();
+	 *         state.update(value + 1);
+	 *         return new Tuple2<>(value, count);
+	 *     }
+	 * });
+	 *
+	 * }</pre>
+	 *
+	 * <p>This method attempts to deduce the type information from the given type class. If the
+	 * full type cannot be determined from the class (for example because of generic parameters),
+	 * the TypeInformation object must be manually passed via 
+	 * {@link #getKeyValueState(TypeInformation, Object)}. 
 	 * 
-	 * <p>
-	 * When storing a {@link Serializable} state the user can omit the
-	 * {@link StateCheckpointer} in which case the full state will be written as
-	 * the snapshot.
-	 * </p>
-	 * 
-	 * @param name
-	 *            Identifier for the state allowing that more operator states
-	 *            can be used by the same operator.
-	 * @param defaultState
-	 *            Default value for the operator state. This will be returned
-	 *            the first time {@link OperatorState#value()} (for every
-	 *            state partition) is called before
-	 *            {@link OperatorState#update(Object)}.
-	 * @param partitioned
-	 *            Sets whether partitioning should be applied for the given
-	 *            state. If true a partitioner key must be used.
-	 * @param checkpointer
-	 *            The {@link StateCheckpointer} that will be used to draw
-	 *            snapshots from the user state.
-	 * @return The {@link OperatorState} for the underlying operator.
-	 * 
-	 * @throws IOException Thrown if the system cannot access the state.
-	 */
-	<S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException;
-
-	/**
-	 * Returns the {@link OperatorState} with the given name of the underlying
-	 * operator instance, which can be used to store and update user state in a
-	 * fault tolerant fashion. The state will be initialized by the provided
-	 * default value.
+	 * @param stateType The class of the type that is stored in the state. Used to generate
+	 *                  serializers for managed memory and checkpointing.
+	 * @param defaultState The default state value, returned when the state is accessed and
+	 *                     no value has yet been set for the key. May be null.
+	 * @param <S> The type of the state.
+	 *
+	 * @return The key/value state access.
+	 *
+	 * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
+	 *                                       function (function is not part os a KeyedStream).
+	 */
+	<S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState);
+
+	/**
+	 * Gets the key/value state, which is only accessible if the function is executed on
+	 * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
+	 * return the value bound to the key of the element currently processed by the function.
 	 * 
-	 * <p>
-	 * When storing a non-{@link Serializable} state the user needs to specify a
-	 * {@link StateCheckpointer} for drawing snapshots.
-	 * </p>
-	 * 
-	 * @param name
-	 *            Identifier for the state allowing that more operator states
-	 *            can be used by the same operator.
-	 * @param defaultState
-	 *            Default value for the operator state. This will be returned
-	 *            the first time {@link OperatorState#value()} (for every
-	 *            state partition) is called before
-	 *            {@link OperatorState#update(Object)}.
-	 * @param partitioned
-	 *            Sets whether partitioning should be applied for the given
-	 *            state. If true a partitioner key must be used.
-	 * @return The {@link OperatorState} for the underlying operator.
-	 * 
-	 * @throws IOException Thrown if the system cannot access the state.
-	 */
-	<S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned) throws IOException;
+	 * <p>Because the scope of each value is the key of the currently processed element,
+	 * and the elements are distributed by the Flink runtime, the system can transparently
+	 * scale out and redistribute the state and KeyedStream.
+	 * 
+	 * <p>The following code example shows how to implement a continuous counter that counts
+	 * how many times elements of a certain key occur, and emits an updated count for that
+	 * element on each occurrence. 
+	 * 
+	 * <pre>{@code
+	 * DataStream<MyType> stream = ...;
+	 * KeyedStream<MyType> keyedStream = stream.keyBy("id");     
+	 * 
+	 * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
+	 * 
+	 *     private State<Long> state;
+	 *     
+	 *     public void open(Configuration cfg) {
+	 *         state = getRuntimeContext().getKeyValueState(Long.class, 0L);
+	 *     }
+	 *     
+	 *     public Tuple2<MyType, Long> map(MyType value) {
+	 *         long count = state.value();
+	 *         state.update(value + 1);
+	 *         return new Tuple2<>(value, count);
+	 *     }
+	 * });
+	 *     
+	 * }</pre>
+	 * 
+	 * @param stateType The type information for the type that is stored in the state.
+	 *                  Used to create serializers for managed memory and checkpoints.   
+	 * @param defaultState The default state value, returned when the state is accessed and
+	 *                     no value has yet been set for the key. May be null.
+	 * @param <S> The type of the state.
+	 *    
+	 * @return The key/value state access.
+	 * 
+	 * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
+	 *                                       function (function is not part os a KeyedStream).
+	 */
+	<S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 71be1e1..90d23cd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
@@ -35,7 +34,7 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 
 /**
@@ -164,16 +163,16 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		}
 		return (Accumulator<V, A>) accumulator;
 	}
-	
+
 	@Override
-	public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
-			S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
-	throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+	public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+		throw new UnsupportedOperationException(
+				"This state is only accessible by functions executed on a KeyedStream");
 	}
 
 	@Override
-	public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned) throws IOException{
-	throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+	public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+		throw new UnsupportedOperationException(
+				"This state is only accessible by functions executed on a KeyedStream");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
index a3a369b..12d9fda 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/ListCollector.java
@@ -22,6 +22,11 @@ import java.util.List;
 
 import org.apache.flink.util.Collector;
 
+/**
+ * A {@link Collector} that puts the collected elements into a given list.
+ * 
+ * @param <T> The type of the collected elements.
+ */
 public class ListCollector<T> implements Collector<T> {
 
 	private final List<T> list;

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 3036023..136b6f8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -20,24 +20,17 @@ package org.apache.flink.api.common.state;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-
 /**
- * Base interface for all streaming operator states. It can represent both
- * partitioned (when state partitioning is defined in the program) or
- * non-partitioned user states.
+ * This state interface abstracts persistent key/value state in streaming programs.
+ * The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
  * 
- * State can be accessed and manipulated using the {@link #value()} and
- * {@link #update(T)} methods. These calls are only safe in the
- * transformation call the operator represents, for instance inside
- * {@link MapFunction#map(Object)} and can lead tp unexpected behavior in the
- * {@link AbstractRichFunction#open(Configuration)} or
- * {@link AbstractRichFunction#close()} methods.
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
  * 
- * @param <T>
- *            Type of the operator state
+ * @param <T> Type of the value in the operator state
  */
 public interface OperatorState<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
deleted file mode 100644
index f373846..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import java.io.Serializable;
-
-/**
- * Basic interface for creating {@link OperatorState} snapshots in stateful
- * streaming programs.
- * 
- * The user needs to implement the {@link #snapshotState(S, long, long)} and
- * {@link #restoreState(C)} methods that will be called to create and restore
- * state snapshots of the given states.
- * 
- * <p>
- * Note that the {@link OperatorState} is <i>synchronously</i> checkpointed.
- * While the state is written, the state cannot be accessed or modified so the
- * function needs not return a copy of its state, but may return a reference to
- * its state.
- * </p>
- * 
- * @param <S>
- *            Type of the operator state.
- * @param <C>
- *            Type of the snapshot that will be persisted.
- */
-public interface StateCheckpointer<S, C extends Serializable> {
-
-	/**
-	 * Takes a snapshot of a given operator state. The snapshot returned will be
-	 * persisted in the state backend for this job and restored upon failure.
-	 * This method is called for all state partitions in case of partitioned
-	 * state when creating a checkpoint.
-	 * 
-	 * @param state
-	 *            The state for which the snapshot needs to be taken
-	 * @param checkpointId
-	 *            The ID of the checkpoint.
-	 * @param checkpointTimestamp
-	 *            The timestamp of the checkpoint, as derived by
-	 *            System.currentTimeMillis() on the JobManager.
-	 * 
-	 * @return A snapshot of the operator state.
-	 */
-	C snapshotState(S state, long checkpointId, long checkpointTimestamp);
-
-	/**
-	 * Restores the operator states from a given snapshot. The restores state
-	 * will be loaded back to the function. In case of partitioned state, each
-	 * partition is restored independently.
-	 * 
-	 * @param stateSnapshot
-	 *            The state snapshot that needs to be restored.
-	 * @return The state corresponding to the snapshot.
-	 */
-	S restoreState(C stateSnapshot);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 36369ab..b1ffdd8 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -415,11 +415,6 @@ public final class ConfigConstants {
 	 */
 	public static final String STATE_BACKEND = "state.backend";
 	
-	/**
-	 * Directory for saving streaming checkpoints
-	 */
-	public static final String STATE_BACKEND_FS_DIR = "state.backend.fs.checkpointdir";
-	
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
index 7de1d71..b4dffb1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
@@ -121,9 +121,10 @@ public class InputViewDataInputStreamWrapper implements DataInputView, Closeable
 	public double readDouble() throws IOException {
 		return in.readDouble();
 	}
-
-	@SuppressWarnings("deprecation")
+	
 	@Override
+	@Deprecated
+	@SuppressWarnings("deprecation")
 	public String readLine() throws IOException {
 		return in.readLine();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index de04dc4..8ce3e85 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -246,7 +246,7 @@ public final class InstantiationUtil {
 		}
 	}
 	
-	public static Object readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
+	public static <T> T readObjectFromConfig(Configuration config, String key, ClassLoader cl) throws IOException, ClassNotFoundException {
 		byte[] bytes = config.getBytes(key, null);
 		if (bytes == null) {
 			return null;
@@ -284,13 +284,14 @@ public final class InstantiationUtil {
 		return serializer.deserialize(record, inputViewWrapper);
 	}
 	
-	public static Object deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
+	@SuppressWarnings("unchecked")
+	public static <T> T deserializeObject(byte[] bytes, ClassLoader cl) throws IOException, ClassNotFoundException {
 		ObjectInputStream oois = null;
 		final ClassLoader old = Thread.currentThread().getContextClassLoader();
 		try {
 			Thread.currentThread().setContextClassLoader(cl);
 			oois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), cl);
-			return oois.readObject();
+			return (T) oois.readObject();
 		} finally {
 			Thread.currentThread().setContextClassLoader(old);
 			if (oois != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
index 5731fc1..504e458 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
@@ -55,6 +55,14 @@ public class SerializedValue<T> implements java.io.Serializable {
 		return serializedData == null ? null : (T) InstantiationUtil.deserializeObject(serializedData, loader);
 	}
 
+	/**
+	 * Gets the size of the serialized state.
+	 * @return The size of the serialized state.
+	 */
+	public int getSizeOfSerializedState() {
+		return serializedData.length;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 5b7afaa..4dbf04c 100644
--- a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.core.testutils;
 
 import static org.junit.Assert.fail;
@@ -37,8 +36,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 
 /**
- * This class contains auxiliary methods for unit tests in the Nephele common module.
- * 
+ * This class contains reusable utility methods for unit tests.
  */
 public class CommonTestUtils {
 
@@ -127,9 +125,7 @@ public class CommonTestUtils {
 		T copy = null;
 		try {
 			copy = clazz.newInstance();
-		} catch (InstantiationException e) {
-			fail(e.getMessage());
-		} catch (IllegalAccessException e) {
+		} catch (InstantiationException | IllegalAccessException e) {
 			fail(e.getMessage());
 		}
 
@@ -157,19 +153,14 @@ public class CommonTestUtils {
 		baos.close();
 
 		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		ObjectInputStream ois = new ObjectInputStream(bais);
 
-		T copy;
-		try {
-			copy = (T) ois.readObject();
+		try (ObjectInputStream ois = new ObjectInputStream(bais)) {
+			@SuppressWarnings("unchecked")
+			T copy = (T) ois.readObject();
+			return copy;
 		}
 		catch (ClassNotFoundException e) {
 			throw new IOException(e);
 		}
-
-		ois.close();
-		bais.close();
-
-		return copy;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 4dd8173..1b04e35 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -79,16 +79,17 @@ webclient.port: 8080
 # The backend that will be used to store operator state checkpoints if 
 # checkpointing is enabled. 
 #
-# Supported backends: jobmanager, filesystem
-
-state.backend: jobmanager
+# Supported backends: jobmanager, filesystem, <class-name-of-factory> 
+#
+#state.backend: filesystem
 
 
-# Directory for storing checkpoints in a flink supported filesystem
-# Note: State backend must be accessible from the JobManager, use file://
-# only for local setups. 
+# Directory for storing checkpoints in a Flink-supported filesystem
+# Note: State backend must be accessible from the JobManager and all TaskManagers.
+# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
+# (or any local file system under Windows), or "S3://" for S3 file system.
 #
-# state.backend.fs.checkpointdir: hdfs://checkpoints
+# state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints
 
 
 #==============================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
index 091c739..c45990b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
@@ -67,35 +67,4 @@ public class FileStateHandle extends ByteStreamStateHandle {
 	public void discardState() throws Exception {
 		FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
 	}
-
-	/**
-	 * Creates a {@link StateHandleProvider} for creating
-	 * {@link FileStateHandle}s for a given checkpoint directory.
-	 * 
-	 */
-	public static StateHandleProvider<Serializable> createProvider(String checkpointDir) {
-		return new FileStateHandleProvider(checkpointDir);
-	}
-
-	/**
-	 * {@link StateHandleProvider} to generate {@link FileStateHandle}s for the
-	 * given checkpoint directory.
-	 * 
-	 */
-	private static class FileStateHandleProvider implements StateHandleProvider<Serializable> {
-
-		private static final long serialVersionUID = 3496670017955260518L;
-		private String path;
-
-		public FileStateHandleProvider(String path) {
-			this.path = path;
-		}
-
-		@Override
-		public FileStateHandle createStateHandle(Serializable state) {
-			return new FileStateHandle(state, path);
-		}
-
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index 1b524d8..f2be70a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -40,18 +40,5 @@ public class LocalStateHandle<T extends Serializable> implements StateHandle<T>
 	}
 
 	@Override
-	public void discardState() throws Exception {
-	}
-
-	public static class LocalStateHandleProvider<R extends Serializable> implements
-			StateHandleProvider<R> {
-
-		private static final long serialVersionUID = 4665419208932921425L;
-
-		@Override
-		public LocalStateHandle<R> createStateHandle(R state) {
-			return new LocalStateHandle<R>(state);
-		}
-
-	}
+	public void discardState() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
deleted file mode 100644
index bac490b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-
-/**
- * Stateful streaming operators use a StateHandleProvider to create new
- * {@link StateHandle}s to store each checkpoint in a persistent storage layer.
- */
-public interface StateHandleProvider<T> extends Serializable {
-
-	/**
-	 * Creates a new {@link StateHandle} instance that will be used to store the
-	 * state checkpoint. This method is called for each state checkpoint saved.
-	 * 
-	 * @param state
-	 *            State to be stored in the handle.
-	 * 
-	 */
-	public StateHandle<T> createStateHandle(T state);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 269222f..c8d50c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -885,7 +885,6 @@ public class Task implements Runnable {
 
 				// build a local closure 
 				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
-				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
 
 				Runnable runnable = new Runnable() {
@@ -919,7 +918,6 @@ public class Task implements Runnable {
 
 				// build a local closure 
 				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
-				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
 
 				Runnable runnable = new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml
index fe1abb3..021d822 100644
--- a/flink-staging/flink-fs-tests/pom.xml
+++ b/flink-staging/flink-fs-tests/pom.xml
@@ -42,24 +42,42 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java-examples</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
@@ -67,6 +85,7 @@ under the License.
 			<type>test-jar</type>
 			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
 		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
new file mode 100644
index 0000000..8b7fb1c
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.StreamStateHandle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FileStateBackendTest {
+	
+	private static File TEMP_DIR;
+	
+	private static String HDFS_ROOT_URI;
+	
+	private static MiniDFSCluster HDFS_CLUSTER;
+	
+	private static FileSystem FS;
+
+	// ------------------------------------------------------------------------
+	//  startup / shutdown
+	// ------------------------------------------------------------------------
+	
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			TEMP_DIR = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+
+			Configuration hdConf = new Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			HDFS_CLUSTER = builder.build();
+
+			HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
+					+ HDFS_CLUSTER.getNameNodePort() + "/";
+			
+			FS = FileSystem.get(new URI(HDFS_ROOT_URI));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Could not create HDFS mini cluster " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			HDFS_CLUSTER.shutdown();
+			FileUtils.deleteDirectory(TEMP_DIR);
+		}
+		catch (Exception ignored) {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testSetupAndSerialization() {
+		try {
+			URI baseUri = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
+			
+			FsStateBackend originalBackend = new FsStateBackend(baseUri);
+
+			assertFalse(originalBackend.isInitialized());
+			assertEquals(baseUri, originalBackend.getBasePath().toUri());
+			assertNull(originalBackend.getCheckpointDirectory());
+
+			// serialize / copy the backend
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
+			assertFalse(backend.isInitialized());
+			assertEquals(baseUri, backend.getBasePath().toUri());
+			assertNull(backend.getCheckpointDirectory());
+
+			// no file operations should be possible right now
+			try {
+				backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+				fail("should fail with an exception");
+			} catch (IllegalStateException e) {
+				// supreme!
+			}
+
+			backend.initializeForJob(new JobID());
+			assertNotNull(backend.getCheckpointDirectory());
+
+			Path checkpointDir = backend.getCheckpointDirectory();
+			assertTrue(FS.exists(checkpointDir));
+			assertTrue(isDirectoryEmpty(checkpointDir));
+
+			backend.disposeAllStateForCurrentJob();
+			assertNull(backend.getCheckpointDirectory());
+
+			assertTrue(isDirectoryEmpty(baseUri));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSerializableState() {
+		
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
+			backend.initializeForJob(new JobID());
+
+			Path checkpointDir = backend.getCheckpointDirectory();
+
+			String state1 = "dummy state";
+			String state2 = "row row row your boat";
+			Integer state3 = 42;
+
+			StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
+			StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
+			StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
+
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+			handle1.discardState();
+
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+			handle2.discardState();
+
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+			handle3.discardState();
+
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testStateOutputStream() {
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri()));
+			backend.initializeForJob(new JobID());
+
+			Path checkpointDir = backend.getCheckpointDirectory();
+
+			byte[] state1 = new byte[1274673];
+			byte[] state2 = new byte[1];
+			byte[] state3 = new byte[0];
+			byte[] state4 = new byte[177];
+
+			Random rnd = new Random();
+			rnd.nextBytes(state1);
+			rnd.nextBytes(state2);
+			rnd.nextBytes(state3);
+			rnd.nextBytes(state4);
+
+			long checkpointId = 97231523452L;
+
+			FsStateBackend.FsCheckpointStateOutputStream stream1 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			FsStateBackend.FsCheckpointStateOutputStream stream2 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			FsStateBackend.FsCheckpointStateOutputStream stream3 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+
+			stream1.write(state1);
+			stream2.write(state2);
+			stream3.write(state3);
+
+			FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
+			FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
+			FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
+
+			// use with try-with-resources
+			StreamStateHandle handle4;
+			try (StateBackend.CheckpointStateOutputStream stream4 =
+						 backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+				stream4.write(state4);
+				handle4 = stream4.closeAndGetHandle();
+			}
+
+			// close before accessing handle
+			StateBackend.CheckpointStateOutputStream stream5 =
+					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			stream5.write(state4);
+			stream5.close();
+			try {
+				stream5.closeAndGetHandle();
+				fail();
+			} catch (IOException e) {
+				// uh-huh
+			}
+
+			validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+			handle1.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureFileDeleted(handle1.getFilePath());
+
+			validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+			handle2.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureFileDeleted(handle2.getFilePath());
+
+			validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+			handle3.discardState();
+			assertFalse(isDirectoryEmpty(checkpointDir));
+			ensureFileDeleted(handle3.getFilePath());
+
+			validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+			handle4.discardState();
+			assertTrue(isDirectoryEmpty(checkpointDir));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static void ensureFileDeleted(Path path) {
+		try {
+			assertFalse(FS.exists(path));
+		}
+		catch (IOException ignored) {}
+	}
+
+	private static boolean isDirectoryEmpty(URI directory) {
+		return isDirectoryEmpty(new Path(directory));
+	}
+	
+	private static boolean isDirectoryEmpty(Path directory) {
+		try {
+			FileStatus[] nested = FS.listStatus(directory);
+			return  nested == null || nested.length == 0;
+		}
+		catch (IOException e) {
+			return true;
+		}
+	}
+
+	private static String randomHdfsFileUri() {
+		return HDFS_ROOT_URI + UUID.randomUUID().toString();
+	}
+
+	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
+		byte[] holder = new byte[data.length];
+		
+		int pos = 0;
+		int read;
+		while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
+			pos += read;
+		}
+			
+		assertEquals("not enough data", holder.length, pos); 
+		assertEquals("too much data", -1, is.read());
+		assertArrayEquals("wrong data", data, holder);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java
deleted file mode 100644
index 59ee5a9..0000000
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.util.SerializedValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileStateHandleTest {
-
-	private String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-	private org.apache.hadoop.fs.Path hdPath;
-	private org.apache.hadoop.fs.FileSystem hdfs;
-
-	@Before
-	public void createHDFS() {
-		try {
-			Configuration hdConf = new Configuration();
-
-			File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
-					+ hdfsCluster.getNameNodePort() + "/";
-
-			hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
-			hdfs = hdPath.getFileSystem(hdConf);
-			hdfs.mkdirs(hdPath);
-
-		} catch (Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			hdfs.delete(hdPath, true);
-			hdfsCluster.shutdown();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@Test
-	public void testFileStateHandle() throws Exception {
-
-		Serializable state = "state";
-
-		// Create a state handle provider for the hdfs directory
-		StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
-				+ hdPath);
-
-		FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
-		
-		try {
-			handleProvider.createStateHandle(null);
-			fail();
-		} catch (RuntimeException e) {
-			// good
-		}
-
-		assertTrue(handle.stateFetched());
-		assertFalse(handle.isWritten());
-
-		// Serialize the handle so it writes the value to hdfs
-		SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
-				handle);
-		
-		assertTrue(handle.isWritten());
-		
-		// Deserialize the handle and verify that the state is not fetched yet
-		FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
-				.deserializeValue(Thread.currentThread().getContextClassLoader());
-		assertFalse(deserializedHandle.stateFetched());
-
-		// Fetch the and compare with original
-		assertEquals(state, deserializedHandle.getState(this.getClass().getClassLoader()));
-
-		// Test whether discard removes the checkpoint file properly
-		assertTrue(hdfs.listFiles(hdPath, true).hasNext());
-		deserializedHandle.discardState();
-		assertFalse(hdfs.listFiles(hdPath, true).hasNext());
-
-	}
-
-}