You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:46 UTC

[19/24] flink git commit: [FLINK-2550] [streaming] Allow multiple key/value states per operator on top of the new state backend

[FLINK-2550] [streaming] Allow multiple key/value states per operator on top of the new state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c205432
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c205432
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c205432

Branch: refs/heads/master
Commit: 7c2054326b259bde0e8dd28d3428e63fd285454c
Parents: bb1f5fd
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 23:59:28 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200

----------------------------------------------------------------------
 .../flink/storm/api/FlinkTopologyBuilder.java   |   1 +
 .../storm/util/SplitStreamTypeKeySelector.java  |   5 +-
 .../storm/api/FlinkTopologyBuilderTest.java     |   3 -
 .../api/common/functions/RuntimeContext.java    |  10 +-
 .../util/AbstractRuntimeUDFContext.java         |   4 +-
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  16 +++
 .../kafka/testutils/MockRuntimeContext.java     |   4 +-
 .../streaming/api/datastream/DataStream.java    |   4 +-
 .../api/operators/AbstractStreamOperator.java   | 102 ++++++++++++-------
 .../api/operators/StreamGroupedFold.java        |   4 +-
 .../api/operators/StreamGroupedReduce.java      |   4 +-
 .../api/operators/StreamingRuntimeContext.java  |  38 ++++---
 .../runtime/tasks/StreamTaskState.java          |  36 +++++--
 .../streaming/util/keys/KeySelectorUtil.java    |  94 +++++++++++++----
 .../api/ChainedRuntimeContextTest.java          |   3 +-
 .../util/keys/ArrayKeySelectorTest.java         |  20 ++--
 .../streaming/api/scala/ConnectedStreams.scala  |   1 -
 .../flink/streaming/api/scala/KeyedStream.scala |   9 ++
 .../api/scala/function/StatefulFunction.scala   |   2 +-
 .../streaming/api/scala/DataStreamTest.scala    |   2 +-
 .../streaming/api/scala/StateTestPrograms.scala |  24 +++--
 .../PartitionedStateCheckpointingITCase.java    |  23 +++--
 .../StreamCheckpointingITCase.java              |   2 +-
 23 files changed, 292 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
index 99de0e2..9c41d88 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -200,6 +200,7 @@ public class FlinkTopologyBuilder {
 									} else {
 										inputStream = inputStream
 												.keyBy(new SplitStreamTypeKeySelector(
+														inputStream.getType(),
 														prodDeclarer.getGroupingFieldIndexes(
 																inputStreamId,
 																grouping.get_fields())));

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
index 44c693c..71e5b86 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.storm.util;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -34,8 +35,8 @@ public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<T
 
 	private final ArrayKeySelector<Tuple> selector;
 
-	public SplitStreamTypeKeySelector(int... fields) {
-		this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+	public SplitStreamTypeKeySelector(TypeInformation<Tuple> type, int... fields) {
+		this.selector = KeySelectorUtil.getSelectorForArray(fields, type);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
index 906d081..fa5c8d8 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
@@ -21,7 +21,6 @@ import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import backtype.storm.tuple.Fields;
@@ -54,7 +53,6 @@ public class FlinkTopologyBuilderTest {
 	}
 
 	@Test
-	@Ignore
 	public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
 		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
 
@@ -66,7 +64,6 @@ public class FlinkTopologyBuilderTest {
 	}
 
 	@Test
-	@Ignore
 	public void testFieldsGroupingOnMultipleBoltOutputStreams() {
 		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index cadef36..7f767c3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -167,6 +167,7 @@ public interface RuntimeContext {
 	 * Gets the key/value state, which is only accessible if the function is executed on
 	 * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
 	 * return the value bound to the key of the element currently processed by the function.
+	 * Each operator may maintain multiple key/value states, addressed with different names.
 	 *
 	 * <p>Because the scope of each value is the key of the currently processed element,
 	 * and the elements are distributed by the Flink runtime, the system can transparently
@@ -200,8 +201,9 @@ public interface RuntimeContext {
 	 * <p>This method attempts to deduce the type information from the given type class. If the
 	 * full type cannot be determined from the class (for example because of generic parameters),
 	 * the TypeInformation object must be manually passed via 
-	 * {@link #getKeyValueState(TypeInformation, Object)}. 
+	 * {@link #getKeyValueState(String, TypeInformation, Object)}. 
 	 * 
+	 * @param name The name of the key/value state.
 	 * @param stateType The class of the type that is stored in the state. Used to generate
 	 *                  serializers for managed memory and checkpointing.
 	 * @param defaultState The default state value, returned when the state is accessed and
@@ -213,12 +215,13 @@ public interface RuntimeContext {
 	 * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
 	 *                                       function (function is not part os a KeyedStream).
 	 */
-	<S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState);
+	<S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
 
 	/**
 	 * Gets the key/value state, which is only accessible if the function is executed on
 	 * a KeyedStream. Upon calling {@link OperatorState#value()}, the key/value state will
 	 * return the value bound to the key of the element currently processed by the function.
+	 * Each operator may maintain multiple key/value states, addressed with different names.
 	 * 
 	 * <p>Because the scope of each value is the key of the currently processed element,
 	 * and the elements are distributed by the Flink runtime, the system can transparently
@@ -249,6 +252,7 @@ public interface RuntimeContext {
 	 *     
 	 * }</pre>
 	 * 
+	 * @param name The name of the key/value state.
 	 * @param stateType The type information for the type that is stored in the state.
 	 *                  Used to create serializers for managed memory and checkpoints.   
 	 * @param defaultState The default state value, returned when the state is accessed and
@@ -260,5 +264,5 @@ public interface RuntimeContext {
 	 * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
 	 *                                       function (function is not part os a KeyedStream).
 	 */
-	<S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState);
+	<S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 90d23cd..be8ac9d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -165,13 +165,13 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
 		throw new UnsupportedOperationException(
 				"This state is only accessible by functions executed on a KeyedStream");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 44339ac..9bb444a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -125,6 +125,22 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
 	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
 		return this.serializer;
 	}
+
+	/**
+	 * Gets the class that represents the component type.
+	 * @return The class of the component type.
+	 */
+	public Class<?> getComponentClass() {
+		return this.arrayClass.getComponentType();
+	}
+
+	/**
+	 * Gets the type information of the component type.
+	 * @return The type information of the component type.
+	 */
+	public TypeInformation<?> getComponentType() {
+		return BasicTypeInfo.getInfoFor(getComponentClass());
+	}
 	
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 9718b72..b9fc3de 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -121,12 +121,12 @@ public class MockRuntimeContext implements RuntimeContext {
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
 		throw new UnsupportedOperationException();
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
 		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c15ea9b..176a07f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -253,7 +253,7 @@ public class DataStream<T> {
 	 */
 	public KeyedStream<T, Tuple> keyBy(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
+			return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
 		} else {
 			return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
 		}
@@ -291,7 +291,7 @@ public class DataStream<T> {
 	 */
 	public DataStream<T> partitionByHash(int... fields) {
 		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return partitionByHash(new KeySelectorUtil.ArrayKeySelector<T>(fields));
+			return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType()));
 		} else {
 			return partitionByHash(new Keys.ExpressionKeys<T>(fields, getType()));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ca86627..9e60e9a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -35,6 +35,9 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Base class for all stream operators. Operators that contain a user function should extend the class 
  * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). 
@@ -77,14 +80,19 @@ public abstract class AbstractStreamOperator<OUT>
 	/** The runtime context for UDFs */
 	private transient StreamingRuntimeContext runtimeContext;
 
+	
 	// ---------------- key/value state ------------------
 	
 	/** key selector used to get the key for the state. Non-null only is the operator uses key/value state */
 	private transient KeySelector<?, ?> stateKeySelector;
 	
-	private transient KvState<?, ?, ?> keyValueState;
+	private transient KvState<?, ?, ?>[] keyValueStates;
+	
+	private transient HashMap<String, KvState<?, ?, ?>> keyValueStatesByName;
 	
-	private transient KvStateSnapshot<?, ?, ?> keyValueStateSnapshot;
+	private transient TypeSerializer<?> keySerializer;
+	
+	private transient HashMap<String, KvStateSnapshot<?, ?, ?>> keyValueStateSnapshots;
 	
 	// ------------------------------------------------------------------------
 	//  Life Cycle
@@ -133,8 +141,10 @@ public abstract class AbstractStreamOperator<OUT>
 	 */
 	@Override
 	public void dispose() {
-		if (keyValueState != null) {
-			keyValueState.dispose();
+		if (keyValueStates != null) {
+			for (KvState<?, ?, ?> state : keyValueStates) {
+				state.dispose();
+			}
 		}
 	}
 	
@@ -147,9 +157,15 @@ public abstract class AbstractStreamOperator<OUT>
 		// here, we deal with key/value state snapshots
 		
 		StreamTaskState state = new StreamTaskState();
-		if (keyValueState != null) {
-			KvStateSnapshot<?, ?, ?> snapshot = keyValueState.shapshot(checkpointId, timestamp);
-			state.setKvState(snapshot);
+		if (keyValueStates != null) {
+			HashMap<String, KvStateSnapshot<?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());
+			
+			for (Map.Entry<String, KvState<?, ?, ?>> entry : keyValueStatesByName.entrySet()) {
+				KvStateSnapshot<?, ?, ?> snapshot = entry.getValue().shapshot(checkpointId, timestamp);
+				snapshots.put(entry.getKey(), snapshot);
+			}
+			
+			state.setKvStates(snapshots);
 		}
 		
 		return state;
@@ -159,7 +175,7 @@ public abstract class AbstractStreamOperator<OUT>
 	public void restoreState(StreamTaskState state) throws Exception {
 		// restore the key/value state. the actual restore happens lazily, when the function requests
 		// the state again, because the restore method needs information provided by the user function
-		keyValueStateSnapshot = state.getKvState();
+		keyValueStateSnapshots = state.getKvStates();
 	}
 	
 	@Override
@@ -232,9 +248,9 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
 	protected <V> OperatorState<V> createKeyValueState(
-			TypeInformation<V> stateType, V defaultValue) throws Exception
+			String name, TypeInformation<V> stateType, V defaultValue) throws Exception
 	{
-		return createKeyValueState(stateType.createSerializer(getExecutionConfig()), defaultValue);
+		return createKeyValueState(name, stateType.createSerializer(getExecutionConfig()), defaultValue);
 	}
 	
 	/**
@@ -253,12 +269,18 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
+	@SuppressWarnings({"rawtypes", "unchecked"})
 	protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
-			TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
+			String name, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
 	{
-		if (keyValueState != null) {
+		if (name == null || name.isEmpty()) {
+			throw new IllegalArgumentException();
+		}
+		if (keyValueStatesByName != null && keyValueStatesByName.containsKey(name)) {
 			throw new IllegalStateException("The key/value state has already been created");
 		}
+
+		TypeSerializer<K> keySerializer;
 		
 		// first time state access, make sure we load the state partitioner
 		if (stateKeySelector == null) {
@@ -267,46 +289,58 @@ public abstract class AbstractStreamOperator<OUT>
 				throw new UnsupportedOperationException("The function or operator is not executed " +
 						"on a KeyedStream and can hence not access the key/value state");
 			}
+
+			keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
+			if (keySerializer == null) {
+				throw new Exception("State key serializer has not been configured in the config.");
+			}
+			this.keySerializer = keySerializer;
 		}
-		
-		// create the key and value serializers
-		TypeSerializer<K> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
-		if (keySerializer == null) {
-			throw new Exception("State key serializer has not been configured in the config.");
+		else if (this.keySerializer != null) {
+			keySerializer = (TypeSerializer<K>) this.keySerializer;
+		}
+		else {
+			// should never happen, this is merely a safeguard
+			throw new RuntimeException();
 		}
 		
 		@SuppressWarnings("unchecked")
 		Backend stateBackend = (Backend) container.getStateBackend();
+
+		KvState<K, V, Backend> kvstate = null;
 		
 		// check whether we restore the key/value state from a snapshot, or create a new blank one
-		if (keyValueStateSnapshot != null) {
+		if (keyValueStateSnapshots != null) {
 			@SuppressWarnings("unchecked")
-			KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshot;
+			KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshots.remove(name);
 
-			KvState<K, V, Backend> kvstate = snapshot.restoreState(
-					stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
-			keyValueState = kvstate;
-			
-			// make sure we have no redundant copies in memory, let the GC clean up
-			keyValueStateSnapshot = null;
-			
-			return kvstate;
+			if (snapshot != null) {
+				kvstate = snapshot.restoreState(
+						stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
+			}
 		}
-		else {
+		
+		if (kvstate == null) {
 			// create a new blank key/value state
-			KvState<K, V, Backend> kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
-			keyValueState = kvstate;
-			return kvstate;
+			kvstate = stateBackend.createKvState(keySerializer, valueSerializer, defaultValue);
 		}
+
+		if (keyValueStatesByName == null) {
+			keyValueStatesByName = new HashMap<>();
+		}
+		keyValueStatesByName.put(name, kvstate);
+		keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
+		return kvstate;
 	}
 	
 	@Override
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public void setKeyContextElement(StreamRecord record) throws Exception {
-		if (stateKeySelector != null && keyValueState != null) {
-			KvState kv = keyValueState;
+		if (stateKeySelector != null && keyValueStates != null) {
 			KeySelector selector = stateKeySelector;
-			kv.setCurrentKey(selector.getKey(record.getValue()));
+			for (KvState kv : keyValueStates) {
+				kv.setCurrentKey(selector.getKey(record.getValue()));
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 79e319a..cf6b489 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -38,6 +38,8 @@ public class StreamGroupedFold<IN, OUT, KEY>
 		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
+	
+	private static final String STATE_NAME = "_op_state";
 
 	// Grouped values
 	private transient OperatorState<OUT> values;
@@ -68,7 +70,7 @@ public class StreamGroupedFold<IN, OUT, KEY>
 				new DataInputStream(bais)
 		);
 		initialValue = outTypeSerializer.deserialize(in);
-		values = createKeyValueState(outTypeSerializer, null);
+		values = createKeyValueState(STATE_NAME, outTypeSerializer, null);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index ebc4b09..ae15e92 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -27,6 +27,8 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 		implements OneInputStreamOperator<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
+
+	private static final String STATE_NAME = "_op_state";
 	
 	private transient OperatorState<IN> values;
 	
@@ -41,7 +43,7 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 	@Override
 	public void open() throws Exception {
 		super.open();
-		values = createKeyValueState(serializer, null);
+		values = createKeyValueState(STATE_NAME, serializer, null);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index a51bb27..87a9abd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -46,10 +47,10 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	private final Environment taskEnvironment;
 	
 	/** The key/value state, if the user-function requests it */
-	private OperatorState<?> keyValueState;
+	private HashMap<String, OperatorState<?>> keyValueStates;
 	
 	/** Type of the values stored in the state, to make sure repeated requests of the state are consistent */
-	private TypeInformation<?> stateTypeInfo;
+	private HashMap<String, TypeInformation<?>> stateTypeInfos;
 	
 	
 	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
@@ -107,7 +108,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(Class<S> stateType, S defaultState) {
+	public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
 		requireNonNull(stateType, "The state type class must not be null");
 
 		TypeInformation<S> typeInfo;
@@ -120,35 +121,48 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 					"Please specify the TypeInformation directly.", e);
 		}
 		
-		return getKeyValueState(typeInfo, defaultState);
+		return getKeyValueState(name, typeInfo, defaultState);
 	}
 
 	@Override
-	public <S> OperatorState<S> getKeyValueState(TypeInformation<S> stateType, S defaultState) {
+	public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
+		requireNonNull(name, "The name of the state must not be null");
 		requireNonNull(stateType, "The state type information must not be null");
 		
+		OperatorState<?> previousState;
+		
 		// check if this is a repeated call to access the state 
-		if (this.stateTypeInfo != null && this.keyValueState != null) {
+		if (this.stateTypeInfos != null && this.keyValueStates != null &&
+				(previousState = this.keyValueStates.get(name)) != null) {
+			
 			// repeated call
-			if (this.stateTypeInfo.equals(stateType)) {
+			TypeInformation<?> previousType;
+			if (stateType.equals((previousType = this.stateTypeInfos.get(name)))) {
 				// valid case, same type requested again
 				@SuppressWarnings("unchecked")
-				OperatorState<S> previous = (OperatorState<S>) this.keyValueState;
+				OperatorState<S> previous = (OperatorState<S>) previousState;
 				return previous;
 			}
 			else {
 				// invalid case, different type requested this time
 				throw new IllegalStateException("Cannot initialize key/value state for type " + stateType +
 						" ; The key/value state has already been created and initialized for a different type: " +
-						this.stateTypeInfo);
+						previousType);
 			}
 		}
 		else {
 			// first time access to the key/value state
+			if (this.stateTypeInfos == null) {
+				this.stateTypeInfos = new HashMap<>();
+			}
+			if (this.keyValueStates == null) {
+				this.keyValueStates = new HashMap<>();
+			}
+			
 			try {
-				OperatorState<S> state = operator.createKeyValueState(stateType, defaultState);
-				this.keyValueState = state;
-				this.stateTypeInfo = stateType;
+				OperatorState<S> state = operator.createKeyValueState(name, stateType, defaultState);
+				this.keyValueStates.put(name, state);
+				this.stateTypeInfos.put(name, stateType);
 				return state;
 			}
 			catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
index 2fce7af..334fd44 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -22,6 +22,9 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.state.KvStateSnapshot;
 
 import java.io.Serializable;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
 
 /**
  * The state checkpointed by a {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}.
@@ -40,7 +43,7 @@ public class StreamTaskState implements Serializable {
 
 	private StateHandle<Serializable> functionState;
 
-	private KvStateSnapshot<?, ?, ?> kvState;
+	private HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates;
 
 	// ------------------------------------------------------------------------
 
@@ -60,12 +63,12 @@ public class StreamTaskState implements Serializable {
 		this.functionState = functionState;
 	}
 
-	public KvStateSnapshot<?, ?, ?> getKvState() {
-		return kvState;
+	public HashMap<String, KvStateSnapshot<?, ?, ?>> getKvStates() {
+		return kvStates;
 	}
 
-	public void setKvState(KvStateSnapshot<?, ?, ?> kvState) {
-		this.kvState = kvState;
+	public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates) {
+		this.kvStates = kvStates;
 	}
 
 	// ------------------------------------------------------------------------
@@ -77,7 +80,7 @@ public class StreamTaskState implements Serializable {
 	 * @return True, if all state is null, false if at least one state is not null.
 	 */
 	public boolean isEmpty() {
-		return operatorState == null & functionState == null & kvState == null;
+		return operatorState == null & functionState == null & kvStates == null;
 	}
 
 	/**
@@ -89,7 +92,7 @@ public class StreamTaskState implements Serializable {
 	public void discardState() throws Exception {
 		StateHandle<?> operatorState = this.operatorState;
 		StateHandle<?> functionState = this.functionState;
-		KvStateSnapshot<?, ?, ?> kvState = this.kvState;
+		HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates = this.kvStates;
 		
 		if (operatorState != null) {
 			operatorState.discardState();
@@ -97,12 +100,25 @@ public class StreamTaskState implements Serializable {
 		if (functionState != null) {
 			functionState.discardState();
 		}
-		if (kvState != null) {
-			kvState.discardState();
+		if (kvStates != null) {
+			while (kvStates.size() > 0) {
+				try {
+					Iterator<KvStateSnapshot<?, ?, ?>> values = kvStates.values().iterator();
+					while (values.hasNext()) {
+						KvStateSnapshot<?, ?, ?> s = values.next();
+						s.discardState();
+						values.remove();
+					}
+				}
+				catch (ConcurrentModificationException e) {
+					// fall through the loop
+				}
+			}
 		}
 
 		this.operatorState = null;
 		this.functionState = null;
-		this.kvState = null;
+		this.kvStates = null;
 	}
 }
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 9c76d95..afbd8ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -18,10 +18,13 @@
 package org.apache.flink.streaming.util.keys;
 
 import java.lang.reflect.Array;
+import java.util.Arrays;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -31,6 +34,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
  */
@@ -49,7 +54,7 @@ public final class KeySelectorUtil {
 		
 		// use ascending order here, the code paths for that are usually a slight bit faster
 		boolean[] orders = new boolean[numKeyFields];
-		TypeInformation[] typeInfos = new TypeInformation[numKeyFields];
+		TypeInformation<?>[] typeInfos = new TypeInformation<?>[numKeyFields];
 		for (int i = 0; i < numKeyFields; i++) {
 			orders[i] = true;
 			typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
@@ -59,31 +64,71 @@ public final class KeySelectorUtil {
 		return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
 	}
 
+	public static <X> ArrayKeySelector<X> getSelectorForArray(int[] positions, TypeInformation<X> typeInfo) {
+		if (positions == null || positions.length == 0 || positions.length > Tuple.MAX_ARITY) {
+			throw new IllegalArgumentException("Array keys must have between 1 and " + Tuple.MAX_ARITY + " fields.");
+		}
+		
+		TypeInformation<?> componentType;
+		
+		if (typeInfo instanceof BasicArrayTypeInfo) {
+			BasicArrayTypeInfo<X, ?>  arrayInfo = (BasicArrayTypeInfo<X, ?>) typeInfo;
+			componentType = arrayInfo.getComponentInfo();
+		}
+		else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
+			PrimitiveArrayTypeInfo<X> arrayType = (PrimitiveArrayTypeInfo<X>) typeInfo;
+			componentType = arrayType.getComponentType();
+		}
+		else {
+			throw new IllegalArgumentException("This method only supports arrays of primitives and boxed primitives.");
+		}
+		
+		TypeInformation<?>[] primitiveInfos = new TypeInformation<?>[positions.length];
+		Arrays.fill(primitiveInfos, componentType);
+
+		return new ArrayKeySelector<>(positions, new TupleTypeInfo<>(primitiveInfos));
+	}
+
 	
-	public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo,
-			ExecutionConfig executionConfig) {
+	public static <X, K> KeySelector<X, K> getSelectorForOneKey(
+			Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)
+	{
+		if (!(typeInfo instanceof CompositeType)) {
+			throw new InvalidTypesException(
+					"This key operation requires a composite type such as Tuples, POJOs, case classes, etc");
+		}
 		if (partitioner != null) {
 			keys.validateCustomPartitioner(partitioner, null);
 		}
 
+		CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
 		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
-
 		if (logicalKeyPositions.length != 1) {
 			throw new IllegalArgumentException("There must be exactly 1 key specified");
 		}
-
-		TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
-				logicalKeyPositions, new boolean[1], 0, executionConfig);
+		
+		TypeComparator<X> comparator = compositeType.createComparator(
+				logicalKeyPositions, new boolean[] { true }, 0, executionConfig);
 		return new OneKeySelector<>(comparator);
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Private constructor to prevent instantiation.
 	 */
 	private KeySelectorUtil() {
 		throw new RuntimeException();
 	}
-	
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Key extractor that extracts a single field via a generic comparator. 
+	 * 
+	 * @param <IN> The type of the elements where the key is extracted from.
+	 * @param <K> The type of the key.
+	 */
 	public static final class OneKeySelector<IN, K> implements KeySelector<IN, K> {
 
 		private static final long serialVersionUID = 1L;
@@ -94,8 +139,8 @@ public final class KeySelectorUtil {
 		 * are null), it does not have any serialization problems */
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final Object[] keyArray;
-
-		public OneKeySelector(TypeComparator<IN> comparator) {
+		
+		OneKeySelector(TypeComparator<IN> comparator) {
 			this.comparator = comparator;
 			this.keyArray = new Object[1];
 		}
@@ -121,18 +166,18 @@ public final class KeySelectorUtil {
 
 		private final TypeComparator<IN> comparator;
 		private final int keyLength;
-		private final TupleTypeInfo tupleTypeInfo;
+		private transient TupleTypeInfo<Tuple> tupleTypeInfo;
 
 		/** Reusable array to hold the key objects. Since this is initially empty (all positions
 		 * are null), it does not have any serialization problems */
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final Object[] keyArray;
 
-		public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo tupleTypeInfo) {
+		ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo<Tuple> tupleTypeInfo) {
 			this.comparator = comparator;
 			this.keyLength = keyLength;
 			this.tupleTypeInfo = tupleTypeInfo;
-			keyArray = new Object[keyLength];
+			this.keyArray = new Object[keyLength];
 		}
 
 		@Override
@@ -147,6 +192,9 @@ public final class KeySelectorUtil {
 
 		@Override
 		public TypeInformation<Tuple> getProducedType() {
+			if (tupleTypeInfo == null) {
+				throw new IllegalStateException("The return type information is not available after serialization");
+			}
 			return tupleTypeInfo;
 		}
 	}
@@ -158,23 +206,35 @@ public final class KeySelectorUtil {
 	 * 
 	 * @param <IN> The type from which the key is extracted, i.e., the array type.
 	 */
-	public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple> {
+	public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
 
 		private static final long serialVersionUID = 1L;
 		
 		private final int[] fields;
+		private final Class<? extends Tuple> tupleClass;
+		private transient TupleTypeInfo<Tuple> returnType;
 
-		public ArrayKeySelector(int... fields) {
-			this.fields = fields;
+		ArrayKeySelector(int[] fields, TupleTypeInfo<Tuple> returnType) {
+			this.fields = requireNonNull(fields);
+			this.returnType = requireNonNull(returnType);
+			this.tupleClass = Tuple.getTupleClass(fields.length);
 		}
 
 		@Override
 		public Tuple getKey(IN value) throws Exception {
-			Tuple key = Tuple.getTupleClass(fields.length).newInstance();
+			Tuple key = tupleClass.newInstance();
 			for (int i = 0; i < fields.length; i++) {
 				key.setField(Array.get(value, fields[i]), i);
 			}
 			return key;
 		}
+
+		@Override
+		public TypeInformation<Tuple> getProducedType() {
+			if (returnType == null) {
+				throw new IllegalStateException("The return type information is not available after serialization");
+			}
+			return returnType;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
index fdf7697..68a047c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
@@ -25,9 +25,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
index 9d807cf..63375a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.util.keys;
 
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
@@ -29,15 +31,17 @@ public class ArrayKeySelectorTest {
 	@Test
 	public void testObjectArrays() {
 		try {
-			Object[] array1 = { "a", "b", "c", "d", "e" };
-			Object[] array2 = { "v", "w", "x", "y", "z" };
+			String[] array1 = { "a", "b", "c", "d", "e" };
+			String[] array2 = { "v", "w", "x", "y", "z" };
 			
-			KeySelectorUtil.ArrayKeySelector<Object[]> singleFieldSelector = new KeySelectorUtil.ArrayKeySelector<>(1);
+			KeySelectorUtil.ArrayKeySelector<String[]> singleFieldSelector =
+					KeySelectorUtil.getSelectorForArray(new int[] {1}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
 			
 			assertEquals(new Tuple1<>("b"), singleFieldSelector.getKey(array1));
 			assertEquals(new Tuple1<>("w"), singleFieldSelector.getKey(array2));
 
-			KeySelectorUtil.ArrayKeySelector<Object[]> twoFieldsSelector = new KeySelectorUtil.ArrayKeySelector<>(3, 0);
+			KeySelectorUtil.ArrayKeySelector<String[]> twoFieldsSelector =
+					KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
 			
 			assertEquals(new Tuple2<>("d", "a"), twoFieldsSelector.getKey(array1));
 			assertEquals(new Tuple2<>("y", "v"), twoFieldsSelector.getKey(array2));
@@ -55,13 +59,15 @@ public class ArrayKeySelectorTest {
 			int[] array1 = { 1, 2, 3, 4, 5 };
 			int[] array2 = { -5, -4, -3, -2, -1, 0 };
 
-			KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector = new KeySelectorUtil.ArrayKeySelector<>(1);
+			KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector = 
+					KeySelectorUtil.getSelectorForArray(new int[] {1}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
 
 			assertEquals(new Tuple1<>(2), singleFieldSelector.getKey(array1));
 			assertEquals(new Tuple1<>(-4), singleFieldSelector.getKey(array2));
 
-			KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector = new KeySelectorUtil.ArrayKeySelector<>(3, 0);
-
+			KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector =
+					KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+			
 			assertEquals(new Tuple2<>(4, 1), twoFieldsSelector.getKey(array1));
 			assertEquals(new Tuple2<>(-2, -5), twoFieldsSelector.getKey(array2));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index f7413b7..3ff773f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -362,4 +362,3 @@ class KeySelectorWithType[IN, K](
 
   override def getProducedType: TypeInformation[K] = info
 }
-  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 84354a3..9f5c069 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -36,6 +36,15 @@ import scala.reflect.ClassTag
 class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
 
   // ------------------------------------------------------------------------
+  //  Properties
+  // ------------------------------------------------------------------------
+
+  /**
+   * Gets the type of the key by which this stream is keyed.
+   */
+  def getKeyType = javaStream.getKeyType()
+  
+  // ------------------------------------------------------------------------
   //  Windowing
   // ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 5a591a8..d66cfdb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -43,6 +43,6 @@ trait StatefulFunction[I, O, S] extends RichFunction {
   }
 
   override def open(c: Configuration) = {
-    state = getRuntimeContext().getKeyValueState[S](stateType, null.asInstanceOf[S])
+    state = getRuntimeContext().getKeyValueState[S]("state", stateType, null.asInstanceOf[S])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index fe85fd1..988e7ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -118,7 +118,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val partition1: DataStream[_] = src1.partitionByHash(0)
     val partition2: DataStream[_] = src1.partitionByHash(1, 0)
     val partition3: DataStream[_] = src1.partitionByHash("_1")
-    val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1);
+    val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1)
 
     val pid1 = createDownStreamId(partition1)
     val pid2 = createDownStreamId(partition2)

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
index 7904bcb..b2e05b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
@@ -31,11 +31,12 @@ object StateTestPrograms {
     
     // test stateful map
     env.generateSequence(0, 10).setParallelism(1)
-      .keyBy(x => x)
+      .map { v => (1, v) }.setParallelism(1)
+      .keyBy(_._1)
       .mapWithState((in, count: Option[Long]) =>
         count match {
-          case Some(c) => (in - c, Some(c + 1))
-          case None => (in, Some(1L))
+          case Some(c) => (in._2 - c, Some(c + 1))
+          case None => (in._2, Some(1L))
         }).setParallelism(1)
       
       .addSink(new RichSinkFunction[Long]() {
@@ -49,12 +50,12 @@ object StateTestPrograms {
       })
 
     // test stateful flatmap
-    env.fromElements("Fir st-", "Hello world")
-      .keyBy(x => x)
+    env.fromElements((1, "First"), (2, "Second"), (1, "Hello world"))
+      .keyBy(_._1)
       .flatMapWithState((w, s: Option[String]) =>
         s match {
-          case Some(state) => (w.split(" ").toList.map(state + _), Some(w))
-          case None => (List(w), Some(w))
+          case Some(state) => (w._2.split(" ").toList.map(state + _), Some(w._2))
+          case None => (List(w._2), Some(w._2))
         })
       .setParallelism(1)
       
@@ -62,10 +63,11 @@ object StateTestPrograms {
         val received = new util.HashSet[String]()
         override def invoke(in: String) = { received.add(in) }
         override def close() = {
-          assert(received.size() == 3)
-          assert(received.contains("Fir st-"))
-          assert(received.contains("Fir st-Hello"))
-          assert(received.contains("Fir st-world"))
+          assert(received.size() == 4)
+          assert(received.contains("First"))
+          assert(received.contains("Second"))
+          assert(received.contains("FirstHello"))
+          assert(received.contains("Firstworld"))
         }
       }).setParallelism(1)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 0fcedda..67c0189 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -107,7 +107,9 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 		@Override
 		public void open(Configuration parameters) throws IOException {
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
-			index = getRuntimeContext().getIndexOfThisSubtask();
+			if (index == 0) {
+				index = getRuntimeContext().getIndexOfThisSubtask();
+			}
 		}
 
 		@Override
@@ -165,7 +167,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
-			sum = getRuntimeContext().getKeyValueState(Long.class, 0L);
+			sum = getRuntimeContext().getKeyValueState("my_state", Long.class, 0L);
 		}
 
 		@Override
@@ -187,17 +189,26 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
 
 		private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>();
 
-		private OperatorState<NonSerializableLong> counts;
+		private OperatorState<NonSerializableLong> aCounts;
+		private OperatorState<Long> bCounts;
 
 		@Override
 		public void open(Configuration parameters) throws IOException {
-			counts = getRuntimeContext().getKeyValueState(NonSerializableLong.class, NonSerializableLong.of(0L));
+			aCounts = getRuntimeContext().getKeyValueState(
+					"a", NonSerializableLong.class, NonSerializableLong.of(0L));
+			bCounts = getRuntimeContext().getKeyValueState("b", Long.class, 0L);
 		}
 
 		@Override
 		public void invoke(Tuple2<Integer, Long> value) throws Exception {
-			long currentCount = counts.value().value + 1;
-			counts.update(NonSerializableLong.of(currentCount));
+			long ac = aCounts.value().value;
+			long bc = bCounts.value();
+			assertEquals(ac, bc);
+			
+			long currentCount = ac + 1;
+			aCounts.update(NonSerializableLong.of(currentCount));
+			bCounts.update(currentCount);
+			
 			allCounts.put(value.f0, currentCount);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c205432/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 992a679..e98696e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -252,7 +252,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
 
 			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
-			pCount = getRuntimeContext().getKeyValueState(Long.class, 0L);
+			pCount = getRuntimeContext().getKeyValueState("pCount", Long.class, 0L);
 		}
 		
 		@Override