You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:36 UTC

[09/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index cf3dcfc..9718b72 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -28,9 +28,8 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
@@ -63,69 +62,71 @@ public class MockRuntimeContext implements RuntimeContext {
 
 	@Override
 	public ExecutionConfig getExecutionConfig() {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public ClassLoader getUserCodeClassLoader() {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
+	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
+		throw new UnsupportedOperationException();
+	}
 
 	@Override
 	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public IntCounter getIntCounter(String name) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public LongCounter getLongCounter(String name) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public DoubleCounter getDoubleCounter(String name) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public Histogram getHistogram(String name) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public <RT> List<RT> getBroadcastVariable(String name) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
 	public DistributedCache getDistributedCache() {
-		return null;
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public <S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
-		return null;
+	public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState, boolean partitioned) throws IOException {
-		return null;
+	public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
index 00c6f80..7034b11 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.collector.selector;
 
 import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -27,17 +26,16 @@ import org.apache.flink.util.Collector;
 public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
 
 	private static final long serialVersionUID = 1L;
-	private List<Collector<StreamRecord<OUT>>> outputs;
+	
+	private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
 
 	public BroadcastOutputSelectorWrapper() {
 		outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
 	}
-
-	@SuppressWarnings("unchecked,rawtypes")
+	
 	@Override
-	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
-		Collector output1 = output;
-		outputs.add((Collector<StreamRecord<OUT>>) output1);
+	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
+		outputs.add(output);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
index c6e3388..84558fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.api.collector.selector;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,32 +38,31 @@ public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper
 
 	private List<OutputSelector<OUT>> outputSelectors;
 
-	private Map<String, List<Collector<StreamRecord<OUT>>>> outputMap;
-	private Set<Collector<StreamRecord<OUT>>> selectAllOutputs;
+	private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> outputMap;
+	private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
 
 	public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
 		this.outputSelectors = outputSelectors;
-		this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>(); //new LinkedList<Collector<OUT>>();
-		this.outputMap = new HashMap<String, List<Collector<StreamRecord<OUT>>>>();
+		this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>();
+		this.outputMap = new HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>>();
 	}
-
-	@SuppressWarnings("unchecked,rawtypes")
+	
 	@Override
-	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
-		Collector output1 = output;
+	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge) {
 		List<String> selectedNames = edge.getSelectedNames();
 
 		if (selectedNames.isEmpty()) {
-			selectAllOutputs.add((Collector<StreamRecord<OUT>>) output1);
-		} else {
+			selectAllOutputs.add(output);
+		}
+		else {
 			for (String selectedName : selectedNames) {
-
 				if (!outputMap.containsKey(selectedName)) {
-					outputMap.put(selectedName, new LinkedList<Collector<StreamRecord<OUT>>>());
-					outputMap.get(selectedName).add((Collector<StreamRecord<OUT>>) output1);
-				} else {
+					outputMap.put(selectedName, new ArrayList<Collector<StreamRecord<OUT>>>());
+					outputMap.get(selectedName).add(output);
+				}
+				else {
 					if (!outputMap.get(selectedName).contains(output)) {
-						outputMap.get(selectedName).add((Collector<StreamRecord<OUT>>) output1);
+						outputMap.get(selectedName).add(output);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
index 9133ac0..f25c995 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.Collector;
 
 public interface OutputSelectorWrapper<OUT> extends Serializable {
 
-	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge);
+	public void addCollector(Collector<StreamRecord<OUT>> output, StreamEdge edge);
 
 	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 00991a7..7e686c7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -172,8 +172,9 @@ public class DataStream<T> {
 	 *            The DataStreams to union output with.
 	 * @return The {@link DataStream}.
 	 */
-	public DataStream<T> union(DataStream<T>... streams) {
-		List<StreamTransformation<T>> unionedTransforms = Lists.newArrayList();
+	@SafeVarargs
+	public final DataStream<T> union(DataStream<T>... streams) {
+		List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
 		unionedTransforms.add(this.transformation);
 
 		Collection<StreamTransformation<?>> thisPredecessors = this.getTransformation().getTransitivePredecessors();
@@ -185,6 +186,11 @@ public class DataStream<T> {
 								"This Stream: " + this.getTransformation() +
 								", other stream: " + newStream.getTransformation());
 			}
+			if (!getType().equals(newStream.getType())) {
+				throw new IllegalArgumentException("Cannot union streams of different types: "
+						+ getType() + " and " + newStream.getType());
+			}
+			
 			Collection<StreamTransformation<?>> predecessors = newStream.getTransformation().getTransitivePredecessors();
 
 			if (predecessors.contains(this.transformation) || thisPredecessors.contains(newStream.getTransformation())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index fdf398c..24104ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 
@@ -76,7 +76,7 @@ public class DataStreamSink<T> {
 	 * @return The sink with chaining disabled
 	 */
 	public DataStreamSink<T> disableChaining() {
-		this.transformation.setChainingStrategy(AbstractStreamOperator.ChainingStrategy.NEVER);
+		this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
 		return this;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 0da419c..cdea910 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -63,6 +63,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	
 	protected final KeySelector<T, KEY> keySelector;
 
+	protected final TypeInformation<KEY> keyType;
+	
 	/**
 	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
 	 * to partition operator state by key.
@@ -73,8 +75,23 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *            Function for determining state partitions
 	 */
 	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
-		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+		this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
+	}
+
+	/**
+	 * Creates a new {@link KeyedStream} using the given {@link KeySelector}
+	 * to partition operator state by key.
+	 *
+	 * @param dataStream
+	 *            Base stream of data
+	 * @param keySelector
+	 *            Function for determining state partitions
+	 */
+	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
+		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
+				dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
 		this.keySelector = keySelector;
+		this.keyType = keyType;
 	}
 
 	
@@ -95,7 +112,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
 		SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);
 
-		((OneInputTransformation<T, R>) returnStream.getTransformation()).setStateKeySelector(keySelector);
+		// inject the key selector and key type
+		OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
+		transform.setStateKeySelector(keySelector);
+		transform.setStateKeyType(keyType);
+		
 		return returnStream;
 	}
 
@@ -105,6 +126,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
 		DataStreamSink<T> result = super.addSink(sinkFunction);
 		result.getTransformation().setStateKeySelector(keySelector);
+		result.getTransformation().setStateKeyType(keyType);
 		return result;
 	}
 	
@@ -197,7 +219,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 */
 	public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
 		return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>(
-				clean(reducer), keySelector, getType()));
+				clean(reducer), getType().createSerializer(getExecutionConfig())));
 	}
 
 	/**
@@ -215,11 +237,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 */
 	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {
 
-		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
-				Utils.getCallLocationName(), true);
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
+				clean(folder), getType(), Utils.getCallLocationName(), true);
 
-		return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder),
-				keySelector, initialValue, getType()));
+		return transform("Keyed Fold", outType, new StreamGroupedFold<>(clean(folder), initialValue));
 	}
 
 	/**
@@ -454,7 +475,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
-		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector, getType());
+		StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
+				clean(aggregate), getType().createSerializer(getExecutionConfig()));
 		return transform("Keyed Aggregation", getType(), operator);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 016cf5e..33d5a3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -141,7 +140,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * @return The operator with chaining disabled
 	 */
 	public SingleOutputStreamOperator<T, O> disableChaining() {
-		return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.NEVER);
+		return setChainingStrategy(ChainingStrategy.NEVER);
 	}
 
 	/**
@@ -152,7 +151,7 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
 	 * @return The operator with chaining set.
 	 */
 	public SingleOutputStreamOperator<T, O> startNewChain() {
-		return setChainingStrategy(AbstractStreamOperator.ChainingStrategy.HEAD);
+		return setChainingStrategy(ChainingStrategy.HEAD);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 28410fd..26e1c9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -45,8 +45,6 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -65,6 +63,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.state.StateBackend;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
@@ -77,7 +76,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * An ExecutionEnvironment for streaming jobs. An instance of it is
@@ -127,8 +127,9 @@ public abstract class StreamExecutionEnvironment {
 	protected CheckpointingMode checkpointingMode;
 
 	protected boolean forceCheckpointing = false;
-
-	protected StateHandleProvider<?> stateHandleProvider;
+	
+	/** The state backend used for storing k/v state and state snapshots */
+	private StateBackend<?> defaultStateBackend;
 	
 	/** The time characteristic used by the data streams */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
@@ -155,8 +156,7 @@ public abstract class StreamExecutionEnvironment {
 	 * program via the command line client from a JAR file, the default degree
 	 * of parallelism is the one configured for that setup.
 	 *
-	 * @param parallelism
-	 * 		The parallelism
+	 * @param parallelism The parallelism
 	 */
 	public StreamExecutionEnvironment setParallelism(int parallelism) {
 		if (parallelism < 1) {
@@ -365,27 +365,40 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the {@link StateHandleProvider} used for storing operator state
-	 * checkpoints when checkpointing is enabled.
-	 * <p>
-	 * An example would be using a {@link FileStateHandle#createProvider(String)}
-	 * to use any Flink supported file system as a state backend
+	 * Sets the state backend that describes how to store and checkpoint operator state. It defines in
+	 * what form the key/value state ({@link org.apache.flink.api.common.state.OperatorState}, accessible
+	 * from operations on {@link org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
+	 * (heap, managed memory, externally), and where state snapshots/checkpoints are stored, both for
+	 * the key/value state, and for checkpointed functions (implementing the interface
+	 * {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
+	 *
+	 * <p>The {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend} for example
+	 * maintains the state in heap memory, as objects. It is lightweight without extra dependencies,
+	 * but can checkpoint only small states (some counters).
 	 * 
+	 * <p>In contrast, the {@link org.apache.flink.streaming.api.state.filesystem.FsStateBackend}
+	 * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated
+	 * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon
+	 * failures of individual nodes and that streaming program can be executed highly available and strongly
+	 * consistent (assuming that Flink is run in high-availability mode).
+	 *
+	 * @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
+	 * 
+	 * @see #getStateBackend()
 	 */
-	public StreamExecutionEnvironment setStateHandleProvider(StateHandleProvider<?> provider) {
-		this.stateHandleProvider = provider;
+	public StreamExecutionEnvironment setStateBackend(StateBackend<?> backend) {
+		this.defaultStateBackend = requireNonNull(backend);
 		return this;
 	}
 
 	/**
-	 * Returns the {@link org.apache.flink.runtime.state.StateHandle}
-	 *
-	 * @see #setStateHandleProvider(org.apache.flink.runtime.state.StateHandleProvider)
-	 *
-	 * @return The StateHandleProvider
+	 * Returns the state backend that defines how to store and checkpoint state.
+	 * @return The state backend that defines how to store and checkpoint state.
+	 * 
+	 * @see #setStateBackend(StateBackend)
 	 */
-	public StateHandleProvider<?> getStateHandleProvider() {
-		return stateHandleProvider;
+	public StateBackend<?> getStateBackend() {
+		return defaultStateBackend;
 	}
 
 	/**
@@ -395,8 +408,7 @@ public abstract class StreamExecutionEnvironment {
 	 * should be used.
 	 *
 	 * @param numberOfExecutionRetries
-	 * 		The number of times the system will try to re-execute failed
-	 * 		tasks.
+	 * 		The number of times the system will try to re-execute failed tasks.
 	 */
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
@@ -423,7 +435,7 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The delay of time the system will wait to re-execute failed
 	 * 		tasks.
 	 */
-	public void setExecutionRetryDelay(long executionRetryDelay){
+	public void setExecutionRetryDelay(long executionRetryDelay) {
 		config.setExecutionRetryDelay(executionRetryDelay);
 	}
 	
@@ -434,7 +446,7 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @return The delay time the system will wait to re-execute failed tasks.
 	 */
-	public long getExecutionRetryDelay(){
+	public long getExecutionRetryDelay() {
 		return config.getExecutionRetryDelay();
 	}
 	/**
@@ -550,7 +562,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @param characteristic The time characteristic.
 	 */
 	public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
-		this.timeCharacteristic = Objects.requireNonNull(characteristic);
+		this.timeCharacteristic = requireNonNull(characteristic);
 		if (characteristic == TimeCharacteristic.ProcessingTime) {
 			getConfig().disableTimestamps();
 			getConfig().setAutoWatermarkInterval(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
index 1cf5c07..504bc39 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.functions.sink;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,8 +38,11 @@ import org.slf4j.LoggerFactory;
  *            Input type
  */
 public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+	
 	private static final long serialVersionUID = 1L;
+	
 	private static final Logger LOG = LoggerFactory.getLogger(FileSinkFunction.class);
+	
 	protected ArrayList<IN> tupleList = new ArrayList<IN>();
 	protected volatile OutputFormat<IN> format;
 	protected volatile boolean cleanupCalled = false;
@@ -51,8 +55,8 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		format.configure(context.getTaskStubParameters());
+		RuntimeContext context = getRuntimeContext();
+		format.configure(parameters);
 		indexInSubtaskGroup = context.getIndexOfThisSubtask();
 		currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
 		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
index 5a9c7a8..93a91cd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.functions.sink;
 import java.io.PrintStream;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 /**
  * Implementation of the SinkFunction writing every tuple to the standard

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index 253c076..cc3925c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
@@ -52,7 +52,8 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 	public void open(Configuration parameters) throws Exception {
 		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
 		this.provider = context.getInputSplitProvider();
-		format.configure(context.getTaskStubParameters());
+		
+		format.configure(parameters);
 		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
 
 		splitIterator = getInputSplits();

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index 2d74e38..14badf1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -17,24 +17,21 @@
  */
 package org.apache.flink.streaming.api.functions.source;
 
-
-import java.io.IOException;
-
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 
 /**
  * A stateful streaming source that emits each number from a given interval exactly once,
  * possibly in parallel.
  */
-public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
+public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
+	
 	private static final long serialVersionUID = 1L;
 
 	private final long start;
 	private final long end;
 
-	private OperatorState<Long> collected;
+	private long collected;
 
 	private volatile boolean isRunning = true;
 
@@ -62,25 +59,28 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
 				((end - start + 1) % stepSize > (congruence - start)) ?
 					((end - start + 1) / stepSize + 1) :
 					((end - start + 1) / stepSize);
-					
-		Long currentCollected = collected.value();
+		
 
-		while (isRunning && currentCollected < toCollect) {
+		while (isRunning && collected < toCollect) {
 			synchronized (checkpointLock) {
-				ctx.collect(currentCollected * stepSize + congruence);
-				collected.update(currentCollected + 1);
+				ctx.collect(collected * stepSize + congruence);
+				collected++;
 			}
-			currentCollected = collected.value();
 		}
 	}
-	
-	@Override
-	public void open(Configuration conf) throws IOException{
-		collected = getRuntimeContext().getOperatorState("collected", 0L, false);
-	}
 
 	@Override
 	public void cancel() {
 		isRunning = false;
 	}
+
+	@Override
+	public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+		return collected;
+	}
+
+	@Override
+	public void restoreState(Long state) {
+		collected = state;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2c422d9..55afc93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -27,11 +27,11 @@ import java.util.Map;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.state.StateBackend;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -48,9 +48,7 @@ public class StreamConfig implements Serializable {
 	private static final String CHAINED_OUTPUTS = "chainedOutputs";
 	private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
 	private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
-	private static final String OUTPUT_NAME = "outputName_";
 	private static final String VERTEX_NAME = "vertexID";
-	private static final String OPERATOR_NAME = "operatorName";
 	private static final String ITERATION_ID = "iterationId";
 	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
 	private static final String SERIALIZEDUDF = "serializedUDF";
@@ -58,8 +56,7 @@ public class StreamConfig implements Serializable {
 	private static final String BUFFER_TIMEOUT = "bufferTimeout";
 	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
 	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
-	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
-	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
 	private static final String ITERATON_WAIT = "iterationWait";
 	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
@@ -67,10 +64,12 @@ public class StreamConfig implements Serializable {
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
 
 	private static final String CHECKPOINTING_ENABLED = "checkpointing";
-	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
-	private static final String STATE_PARTITIONER = "statePartitioner";
 	private static final String CHECKPOINT_MODE = "checkpointMode";
 	
+	private static final String STATE_BACKEND = "statebackend";
+	private static final String STATE_PARTITIONER = "statePartitioner";
+	private static final String STATE_KEY_SERIALIZER = "statekeyser";
+	
 	
 	// ------------------------------------------------------------------------
 	//  Default Values
@@ -97,7 +96,6 @@ public class StreamConfig implements Serializable {
 	// ------------------------------------------------------------------------
 	//  Configured Properties
 	// ------------------------------------------------------------------------
-	
 
 	public void setVertexID(Integer vertexID) {
 		config.setInteger(VERTEX_NAME, vertexID);
@@ -106,15 +104,7 @@ public class StreamConfig implements Serializable {
 	public Integer getVertexID() {
 		return config.getInteger(VERTEX_NAME, -1);
 	}
-
-	public void setOperatorName(String name) {
-		config.setString(OPERATOR_NAME, name);
-	}
-
-	public String getOperatorName() {
-		return config.getString(OPERATOR_NAME, "Missing");
-	}
-
+	
 	public void setTypeSerializerIn1(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
 	}
@@ -123,49 +113,29 @@ public class StreamConfig implements Serializable {
 		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
 	}
 
-	public void setTypeSerializerOut1(TypeSerializer<?> serializer) {
+	public void setTypeSerializerOut(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
-
-	public void setTypeSerializerOut2(TypeSerializer<?> serializer) {
-		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
-	}
-
-	@SuppressWarnings("unchecked")
+	
 	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
 		try {
-			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_IN_1, cl);
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_1, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
 		try {
-			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_IN_2, cl);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate serializer.", e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> TypeSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
-		try {
-			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_OUT_1, cl);
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_2, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
-	public <T> TypeSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
+	
+	public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
 		try {
-			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					TYPE_SERIALIZER_OUT_2, cl);
+			return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
@@ -202,9 +172,7 @@ public class StreamConfig implements Serializable {
 	
 	public <T> T getStreamOperator(ClassLoader cl) {
 		try {
-			@SuppressWarnings("unchecked")
-			T result = (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
-			return result;
+			return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
 		}
 		catch (ClassNotFoundException e) {
 			String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
@@ -230,12 +198,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
 		try {
-			return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
-					OUTPUT_SELECTOR_WRAPPER, cl);
+			return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e);
 		}
@@ -280,11 +246,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Cannot serialize non chained outputs.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
 		try {
-			List<StreamEdge> nonChainedOutputs = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
+			List<StreamEdge> nonChainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
 			return nonChainedOutputs == null ?  new ArrayList<StreamEdge>() : nonChainedOutputs;
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate non chained outputs.", e);
@@ -298,11 +263,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Cannot serialize chained outputs.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
 		try {
-			List<StreamEdge> chainedOutputs = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl);
+			List<StreamEdge> chainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl);
 			return chainedOutputs == null ? new ArrayList<StreamEdge>() : chainedOutputs;
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate chained outputs.", e);
@@ -316,12 +280,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Cannot serialize outward edges.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public List<StreamEdge> getOutEdges(ClassLoader cl) {
 		try {
-			List<StreamEdge> outEdges = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
-					this.config, OUT_STREAM_EDGES, cl);
+			List<StreamEdge> outEdges = InstantiationUtil.readObjectFromConfig(this.config, OUT_STREAM_EDGES, cl);
 			return outEdges == null ? new ArrayList<StreamEdge>() : outEdges;
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate outputs.", e);
@@ -335,12 +297,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Cannot serialize inward edges.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
 		try {
-			List<StreamEdge> inEdges = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
-					this.config, IN_STREAM_EDGES, cl);
+			List<StreamEdge> inEdges = InstantiationUtil.readObjectFromConfig(this.config, IN_STREAM_EDGES, cl);
 			return inEdges == null ? new ArrayList<StreamEdge>() : inEdges;
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate inputs.", e);
@@ -378,12 +338,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not serialize outputs in order.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
 		try {
-			List<StreamEdge> outEdgesInOrder = (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
-					this.config, EDGES_IN_ORDER, cl);
+			List<StreamEdge> outEdgesInOrder = InstantiationUtil.readObjectFromConfig(this.config, EDGES_IN_ORDER, cl);
 			return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate outputs in order.", e);
@@ -398,31 +356,31 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not serialize configuration.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
 		try {
-			Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
-					.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
+			Map<Integer, StreamConfig> confs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
 			return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate configuration.", e);
 		}
 	}
 	
-	public void setStateHandleProvider(StateHandleProvider<?> provider) {
+	// ------------------------------------------------------------------------
+	//  State backend
+	// ------------------------------------------------------------------------
+	
+	public void setStateBackend(StateBackend<?> backend) {
 		try {
-			InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
-		} catch (IOException e) {
+			InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
+		} catch (Exception e) {
 			throw new StreamTaskException("Could not serialize stateHandle provider.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
-	public <R> StateHandleProvider<R> getStateHandleProvider(ClassLoader cl) {
+	
+	public StateBackend<?> getStateBackend(ClassLoader cl) {
 		try {
-			return (StateHandleProvider<R>) InstantiationUtil
-					.readObjectFromConfig(this.config, STATEHANDLE_PROVIDER, cl);
+			return InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate statehandle provider.", e);
 		}
@@ -435,17 +393,35 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not serialize state partitioner.", e);
 		}
 	}
-
-	@SuppressWarnings("unchecked")
+	
 	public KeySelector<?, Serializable> getStatePartitioner(ClassLoader cl) {
 		try {
-			return (KeySelector<?, Serializable>) InstantiationUtil
-					.readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
+			return InstantiationUtil.readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate state partitioner.", e);
 		}
 	}
+	
+	public void setStateKeySerializer(TypeSerializer<?> serializer) {
+		try {
+			InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize state key serializer.", e);
+		}
+	}
 
+	public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) {
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, STATE_KEY_SERIALIZER, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Miscellansous
+	// ------------------------------------------------------------------------
+	
 	public void setChainStart() {
 		config.setBoolean(IS_CHAINED_VERTEX, true);
 	}
@@ -463,7 +439,6 @@ public class StreamConfig implements Serializable {
 		builder.append("\n=======================");
 		builder.append("Stream Config");
 		builder.append("=======================");
-		builder.append("\nTask name: ").append(getVertexID());
 		builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
 		builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
 		builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 2ca82b1..4c5c19c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -43,7 +43,6 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +50,7 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.state.StateBackend;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -60,9 +60,12 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.sling.commons.json.JSONException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Class representing the streaming topology. It contains all the information
  * necessary to build the jobgraph for the execution.
@@ -93,7 +96,7 @@ public class StreamGraph extends StreamingPlan {
 
 	protected Map<Integer, String> vertexIDtoBrokerID;
 	protected Map<Integer, Long> vertexIDtoLoopTimeout;
-	private StateHandleProvider<?> stateHandleProvider;
+	private StateBackend<?> stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
 	private boolean forceCheckpoint = false;
@@ -145,12 +148,12 @@ public class StreamGraph extends StreamingPlan {
 		this.forceCheckpoint = true;
 	}
 
-	public void setStateHandleProvider(StateHandleProvider<?> provider) {
-		this.stateHandleProvider = provider;
+	public void setStateBackend(StateBackend<?> backend) {
+		this.stateBackend = requireNonNull(backend);
 	}
 
-	public StateHandleProvider<?> getStateHandleProvider() {
-		return this.stateHandleProvider;
+	public StateBackend<?> getStateBackend() {
+		return this.stateBackend;
 	}
 
 	public long getCheckpointingInterval() {
@@ -392,8 +395,10 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
-	public void setKey(Integer vertexID, KeySelector<?, ?> key) {
-		getStreamNode(vertexID).setStatePartitioner(key);
+	public void setKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
+		StreamNode node = getStreamNode(vertexID);
+		node.setStatePartitioner(keySelector);
+		node.setStateKeySerializer(keySerializer);
 	}
 
 	public void setBufferTimeout(Integer vertexID, long bufferTimeout) {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 774c00b..4a87eb3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -17,8 +17,7 @@
  */
 package org.apache.flink.streaming.api.graph;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
@@ -36,8 +35,10 @@ import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -97,17 +98,19 @@ public class StreamGraphGenerator {
 	private StreamGraphGenerator(StreamExecutionEnvironment env) {
 		this.streamGraph = new StreamGraph(env);
 		this.streamGraph.setChaining(env.isChainingEnabled());
+		
 		if (env.getCheckpointInterval() > 0) {
 			this.streamGraph.setCheckpointingEnabled(true);
 			this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
 			this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
 		}
-		this.streamGraph.setStateHandleProvider(env.getStateHandleProvider());
+		this.streamGraph.setStateBackend(env.getStateBackend());
 		if (env.isForceCheckpointing()) {
 			this.streamGraph.forceCheckpoint();
 		}
+		
 		this.env = env;
-		this.alreadyTransformed = Maps.newHashMap();
+		this.alreadyTransformed = new HashMap<>();
 	}
 
 	/**
@@ -202,7 +205,7 @@ public class StreamGraphGenerator {
 	 */
 	private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
 		List<StreamTransformation<T>> inputs = union.getInputs();
-		List<Integer> resultIds = Lists.newArrayList();
+		List<Integer> resultIds = new ArrayList<>();
 
 		for (StreamTransformation<T> input: inputs) {
 			resultIds.addAll(transform(input));
@@ -220,7 +223,7 @@ public class StreamGraphGenerator {
 	 */
 	private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
 		StreamTransformation<T> input = partition.getInput();
-		List<Integer> resultIds = Lists.newArrayList();
+		List<Integer> resultIds = new ArrayList<>();
 
 		Collection<Integer> transformedIds = transform(input);
 		for (Integer transformedId: transformedIds) {
@@ -273,7 +276,7 @@ public class StreamGraphGenerator {
 			return alreadyTransformed.get(select);
 		}
 
-		List<Integer> virtualResultIds = Lists.newArrayList();
+		List<Integer> virtualResultIds = new ArrayList<>();
 
 		for (int inputId : resultIds) {
 			int virtualId = StreamTransformation.getNewNodeId();
@@ -301,7 +304,7 @@ public class StreamGraphGenerator {
 		}
 
 		StreamTransformation<T> input = iterate.getInput();
-		List<Integer> resultIds = Lists.newArrayList();
+		List<Integer> resultIds = new ArrayList<>();
 
 		// first transform the input stream(s) and store the result IDs
 		resultIds.addAll(transform(input));
@@ -442,7 +445,8 @@ public class StreamGraphGenerator {
 
 
 		if (sink.getStateKeySelector() != null) {
-			streamGraph.setKey(sink.getId(), sink.getStateKeySelector());
+			TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(env.getConfig());
+			streamGraph.setKey(sink.getId(), sink.getStateKeySelector(), keySerializer);
 		}
 
 		return Collections.emptyList();
@@ -471,7 +475,11 @@ public class StreamGraphGenerator {
 				transform.getName());
 
 		if (transform.getStateKeySelector() != null) {
-			streamGraph.setKey(transform.getId(), transform.getStateKeySelector());
+			TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
+			streamGraph.setKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
+		}
+		if (transform.getStateKeyType() != null) {
+			
 		}
 
 		streamGraph.setParallelism(transform.getId(), transform.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 9110cd3..608e648 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -50,6 +50,7 @@ public class StreamNode implements Serializable {
 	private Integer slotSharingID;
 	private boolean isolatedSlot = false;
 	private KeySelector<?,?> statePartitioner;
+	private TypeSerializer<?> stateKeySerializer;
 
 	private transient StreamOperator<?> operator;
 	private List<OutputSelector<?>> outputSelectors;
@@ -233,6 +234,14 @@ public class StreamNode implements Serializable {
 		this.statePartitioner = statePartitioner;
 	}
 
+	public TypeSerializer<?> getStateKeySerializer() {
+		return stateKeySerializer;
+	}
+
+	public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
+		this.stateKeySerializer = stateKeySerializer;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index d8e81cf..45cfff1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
@@ -211,13 +211,9 @@ public class StreamingJobGraphGenerator {
 			for (StreamEdge chainable : chainedOutputs) {
 				outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
 			}
-			String returnOperatorName = operatorName + " -> ("
-					+ StringUtils.join(outputChainedNames, ", ") + ")";
-			return returnOperatorName;
+			return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
 		} else if (chainedOutputs.size() == 1) {
-			String returnOperatorName = operatorName + " -> "
-					+ chainedNames.get(chainedOutputs.get(0).getTargetId());
-			return returnOperatorName;
+			return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
 		} else {
 			return operatorName;
 		}
@@ -249,9 +245,7 @@ public class StreamingJobGraphGenerator {
 		builtVertices.add(vertexID);
 		jobGraph.addVertex(jobVertex);
 
-		StreamConfig retConfig = new StreamConfig(jobVertex.getConfiguration());
-		retConfig.setOperatorName(chainedNames.get(vertexID));
-		return retConfig;
+		return new StreamConfig(jobVertex.getConfiguration());
 	}
 
 	@SuppressWarnings("unchecked")
@@ -265,7 +259,7 @@ public class StreamingJobGraphGenerator {
 
 		config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
 		config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
-		config.setTypeSerializerOut1(vertex.getTypeSerializerOut());
+		config.setTypeSerializerOut(vertex.getTypeSerializerOut());
 
 		config.setStreamOperator(vertex.getOperator());
 		config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
@@ -277,13 +271,14 @@ public class StreamingJobGraphGenerator {
 		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
 		if (streamGraph.isCheckpointingEnabled()) {
 			config.setCheckpointMode(streamGraph.getCheckpointingMode());
-			config.setStateHandleProvider(streamGraph.getStateHandleProvider());
+			config.setStateBackend(streamGraph.getStateBackend());
 		} else {
 			// the at least once input handler is slightly cheaper (in the absence of checkpoints),
 			// so we use that one if checkpointing is not enabled
 			config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
 		}
 		config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
+		config.setStateKeySerializer(vertex.getStateKeySerializer());
 
 		
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 87041eb..e99d54d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -19,61 +19,302 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.state.KvState;
+import org.apache.flink.streaming.api.state.KvStateSnapshot;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Base class for all stream operators.
- * 
- * Operators that contain a user function should extend the class 
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
+ * Base class for all stream operators. Operators that contain a user function should extend the class 
+ * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). 
  * 
+ * <p>For concrete implementations, one of the following two interfaces must also be implemented, to
+ * mark the operator as unary or binary:
+ * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
+ * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator}.
+ *
+ * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
+ * the timer service, timer callbacks are also guaranteed not to be called concurrently with
+ * methods on {@code StreamOperator}.
+ *
  * @param <OUT> The output type of the operator
  */
-public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT> {
+public abstract class AbstractStreamOperator<OUT> 
+		implements StreamOperator<OUT>, java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
+	
+	/** The logger used by the operator class and its subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
 
-	protected transient StreamingRuntimeContext runtimeContext;
+	// ----------- configuration properties -------------
 
-	protected transient ExecutionConfig executionConfig;
+	// A sane default for most operators
+	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+	
+	private boolean inputCopyDisabled = false;
+	
+	// ---------------- runtime fields ------------------
 
-	protected transient Output<StreamRecord<OUT>> output;
+	/** The task that contains this operator (and other operators in the same chain) */
+	private transient StreamTask<?, ?> container;
+	
+	private transient StreamConfig config;
 
-	protected boolean inputCopyDisabled = false;
+	protected transient Output<StreamRecord<OUT>> output;
 
-	// A sane default for most operators
-	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+	/** The runtime context for UDFs */
+	private transient StreamingRuntimeContext runtimeContext;
 
+	// ---------------- key/value state ------------------
+	
+	/** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
+	private transient KeySelector<?, ?> stateKeySelector;
+	
+	private transient KvState<?, ?, ?> keyValueState;
+	
+	private transient KvStateSnapshot<?, ?, ?> keyValueStateSnapshot;
+	
 	// ------------------------------------------------------------------------
 	//  Life Cycle
 	// ------------------------------------------------------------------------
-	
+
 	@Override
-	public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
+	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+		this.container = containingTask;
+		this.config = config;
 		this.output = output;
-		this.executionConfig = runtimeContext.getExecutionConfig();
-		this.runtimeContext = runtimeContext;
+		this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
 	}
 
 	/**
-	 * This default implementation of the interface method does nothing.
+	 * This method is called immediately before any elements are processed, it should contain the
+	 * operator's initialization logic.
+	 *
+	 * <p>The default implementation does nothing.
+	 * 
+	 * @throws Exception An exception in this method causes the operator to fail.
 	 */
 	@Override
-	public void open(Configuration parameters) throws Exception {}
+	public void open() throws Exception {}
 
 	/**
-	 * This default implementation of the interface method does nothing.
+	 * This method is called after all records have been added to the operators via the methods
+	 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
+	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
+	 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
+
+	 * <p>The method is expected to flush all remaining buffered data. Exceptions during this flushing
+	 * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
+	 * because the last data items are not processed properly.
+	 *
+	 * @throws Exception An exception in this method causes the operator to fail.
 	 */
 	@Override
 	public void close() throws Exception {}
+	
+	/**
+	 * This method is called at the very end of the operator's life, both in the case of a successful
+	 * completion of the operation, and in the case of a failure and canceling.
+	 *
+	 * This method is expected to make a thorough effort to release all resources
+	 * that the operator has acquired.
+	 */
+	@Override
+	public void dispose() {
+		if (keyValueState != null) {
+			keyValueState.dispose();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		// here, we deal with operator checkpoints and key/value state snapshots
+		
+		StreamTaskState state = new StreamTaskState();
+		
+		// (1) checkpoint the operator, if the operator is stateful
+		
+		// (2) draw a snapshot of the key/value state 
+		if (keyValueState != null) {
+			KvStateSnapshot<?, ?, ?> snapshot = keyValueState.shapshot(checkpointId, timestamp);
+			state.setKvState(snapshot);
+		}
+		
+		return state;
+	}
+	
+	@Override
+	public void restoreState(StreamTaskState state) throws Exception {
+		// (1) checkpoint the operator, if the operator is stateful
+		
+		// (2) restore the key/value state. the actual restore happens lazily, when the function requests
+		//     the state again, because the restore method needs information provided by the user function
+		keyValueStateSnapshot = state.getKvState();
+	}
+	
+	@Override
+	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+		// by default, nothing needs a notification of checkpoint completion
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties and Services
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the execution config defined on the execution environment of the job to which this
+	 * operator belongs.
+	 * 
+	 * @return The job's execution config.
+	 */
+	public ExecutionConfig getExecutionConfig() {
+		return container.getExecutionConfig();
+	}
+	
+	public StreamConfig getOperatorConfig() {
+		return config;
+	}
+	
+	public StreamTask<?, ?> getContainingTask() {
+		return container;
+	}
+	
+	public ClassLoader getUserCodeClassloader() {
+		return container.getUserCodeClassLoader();
+	}
+	
+	/**
+	 * Returns a context that allows the operator to query information about the execution and also
+	 * to interact with systems such as broadcast variables and managed state. This also allows
+	 * to register timers.
+	 */
+	public StreamingRuntimeContext getRuntimeContext() {
+		return runtimeContext;
+	}
+
+	public StateBackend<?> getStateBackend() {
+		return container.getStateBackend();
+	}
 
 	/**
-	 * This default implementation of the interface method does nothing.
+	 * Register a timer callback. At the specified time the {@link Triggerable} will be invoked.
+	 * This call is guaranteed to not happen concurrently with method calls on the operator.
+	 *
+	 * @param time The absolute time in milliseconds.
+	 * @param target The target to be triggered.
 	 */
+	protected void registerTimer(long time, Triggerable target) {
+		container.registerTimer(time, target);
+	}
+
+	/**
+	 * Creates a key/value state handle, using the state backend configured for this task.
+	 *
+	 * @param stateType The type information for the state type, used for managed memory and state snapshots.
+	 * @param defaultValue The default value that the state should return for keys that currently have
+	 *                     no value associated with them 
+	 *
+	 * @param <V> The type of the state value.
+	 *
+	 * @return The key/value state for this operator.
+	 *
+	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
+	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
+	 */
+	protected <V> OperatorState<V> createKeyValueState(
+			TypeInformation<V> stateType, V defaultValue) throws Exception
+	{
+		return createKeyValueState(stateType.createSerializer(getExecutionConfig()), defaultValue);
+	}
+	
+	/**
+	 * Creates a key/value state handle, using the state backend configured for this task.
+	 * 
+	 * @param valueSerializer The type serializer for the state type, used for managed memory and state snapshots.
+	 * @param defaultValue The default value that the state should return for keys that currently have
+	 *                     no value associated with them 
+	 * 
+	 * @param <K> The type of the state key.
+	 * @param <V> The type of the state value.
+	 * @param <Backend> The type of the state backend that creates the key/value state.
+	 * 
+	 * @return The key/value state for this operator.
+	 * 
+	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
+	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
+	 */
+	protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
+			TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
+	{
+		if (keyValueState != null) {
+			throw new IllegalStateException("The key/value state has already been created");
+		}
+		
+		// first time state access, make sure we load the state partitioner
+		if (stateKeySelector == null) {
+			stateKeySelector = config.getStatePartitioner(getUserCodeClassloader());
+			if (stateKeySelector == null) {
+				throw new UnsupportedOperationException("The function or operator is not executed " +
+						"on a KeyedStream and can hence not access the key/value state");
+			}
+		}
+		
+		// create the key and value serializers
+		TypeSerializer<K> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
+		if (keySerializer == null) {
+			throw new Exception("State key serializer has not been configured in the config.");
+		}
+		
+		@SuppressWarnings("unchecked")
+		Backend stateBackend = (Backend) container.getStateBackend();
+		
+		// check whether we restore the key/value state from a snapshot, or create a new blank one
+		if (keyValueStateSnapshot != null) {
+			@SuppressWarnings("unchecked")
+			KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshot;
+
+			KvState<K, V, Backend> kvstate = snapshot.restoreState(
+					stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
+			keyValueState = kvstate;
+			
+			// make sure we have no redundant copies in memory, let the GC clean up
+			keyValueStateSnapshot = null;
+			
+			return kvstate;
+		}
+		else {
+			// create a new blank key/value state
+			KvState<K, V, Backend> kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
+			keyValueState = kvstate;
+			return kvstate;
+		}
+	}
+	
 	@Override
-	public void dispose() {}
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public void setKeyContextElement(StreamRecord record) throws Exception {
+		if (stateKeySelector != null && keyValueState != null) {
+			KvState kv = keyValueState;
+			KeySelector selector = stateKeySelector;
+			kv.setCurrentKey(selector.getKey(record.getValue()));
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Context and chaining properties
@@ -83,12 +324,12 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 	public final void setChainingStrategy(ChainingStrategy strategy) {
 		this.chainingStrategy = strategy;
 	}
-
+	
 	@Override
 	public final ChainingStrategy getChainingStrategy() {
 		return chainingStrategy;
 	}
-
+	
 	@Override
 	public boolean isInputCopyingDisabled() {
 		return inputCopyDisabled;
@@ -96,14 +337,9 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	/**
 	 * Enable object-reuse for this operator instance. This overrides the setting in
-	 * the {@link org.apache.flink.api.common.ExecutionConfig}/
+	 * the {@link org.apache.flink.api.common.ExecutionConfig}
 	 */
 	public void disableInputCopy() {
 		this.inputCopyDisabled = true;
 	}
-	
-	@Override
-	public StreamingRuntimeContext getRuntimeContext(){
-		return runtimeContext;
-	}
 }