You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/23 12:01:22 UTC

[2/5] flink git commit: [FLINK-4556] [distributed runtime] Make Queryable State Key-Group Aware

[FLINK-4556] [distributed runtime] Make Queryable State Key-Group Aware

This closes #2523


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/181b5451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/181b5451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/181b5451

Branch: refs/heads/master
Commit: 181b54515ed2701a25d4a71fa7ed52394b2aeb66
Parents: 4ee048b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Sep 20 16:36:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:24:42 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/query/KvStateLocation.java    | 47 ++++++-----
 .../runtime/query/KvStateLocationRegistry.java  | 16 ++--
 .../flink/runtime/query/KvStateMessage.java     | 35 ++++----
 .../flink/runtime/query/KvStateRegistry.java    | 12 +--
 .../runtime/query/KvStateRegistryListener.java  |  9 ++-
 .../runtime/query/QueryableStateClient.java     |  4 +-
 .../runtime/query/TaskKvStateRegistry.java      | 17 ++--
 .../runtime/state/KeyGroupRangeAssignment.java  | 15 +++-
 .../flink/runtime/state/KeyedStateBackend.java  |  8 +-
 .../ActorGatewayKvStateRegistryListener.java    |  9 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +-
 .../runtime/jobmanager/JobManagerTest.java      | 20 ++---
 .../query/KvStateLocationRegistryTest.java      | 35 ++++----
 .../runtime/query/KvStateLocationTest.java      | 85 ++++++++++++++------
 .../runtime/query/QueryableStateClientTest.java | 10 +--
 .../runtime/query/netty/KvStateClientTest.java  |  2 +-
 .../query/netty/KvStateServerHandlerTest.java   | 10 +--
 .../runtime/state/StateBackendTestBase.java     |  9 ++-
 .../flink/test/query/QueryableStateITCase.java  |  1 -
 19 files changed, 207 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 9be22c2..90bb2a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -160,44 +161,50 @@ public class KvStateLocation implements Serializable {
 	/**
 	 * Registers a KvState instance for the given key group index.
 	 *
-	 * @param keyGroupIndex  Key group index to register
+	 * @param keyGroupRange  Key group range to register
 	 * @param kvStateId      ID of the KvState instance at the key group index.
 	 * @param kvStateAddress Server address of the KvState instance at the key group index.
-	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
 	 */
-	void registerKvState(int keyGroupIndex, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
-		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+	void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
+
+		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
 			throw new IndexOutOfBoundsException("Key group index");
 		}
 
-		if (kvStateIds[keyGroupIndex] == null && kvStateAddresses[keyGroupIndex] == null) {
-			numRegisteredKeyGroups++;
-		}
+		for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) {
+
+			if (kvStateIds[kgIdx] == null && kvStateAddresses[kgIdx] == null) {
+				numRegisteredKeyGroups++;
+			}
 
-		kvStateIds[keyGroupIndex] = kvStateId;
-		kvStateAddresses[keyGroupIndex] = kvStateAddress;
+			kvStateIds[kgIdx] = kvStateId;
+			kvStateAddresses[kgIdx] = kvStateAddress;
+		}
 	}
 
 	/**
 	 * Registers a KvState instance for the given key group index.
 	 *
-	 * @param keyGroupIndex Key group index to unregister.
-	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
-	 * @throws IllegalArgumentException If no location information registered for key group index.
+	 * @param keyGroupRange Key group range to unregister.
+	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
+	 * @throws IllegalArgumentException  If no location information registered for a key group index in the range.
 	 */
-	void unregisterKvState(int keyGroupIndex) {
-		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+	void unregisterKvState(KeyGroupRange keyGroupRange) {
+		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
 			throw new IndexOutOfBoundsException("Key group index");
 		}
 
-		if (kvStateIds[keyGroupIndex] == null || kvStateAddresses[keyGroupIndex] == null) {
-			throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
-		}
+		for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) {
+			if (kvStateIds[kgIdx] == null || kvStateAddresses[kgIdx] == null) {
+				throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
+			}
 
-		numRegisteredKeyGroups--;
+			numRegisteredKeyGroups--;
 
-		kvStateIds[keyGroupIndex] = null;
-		kvStateAddresses[keyGroupIndex] = null;
+			kvStateIds[kgIdx] = null;
+			kvStateAddresses[kgIdx] = null;
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
index 5b76598..c489025 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -73,7 +74,7 @@ public class KvStateLocationRegistry {
 	 * Notifies the registry about a registered KvState instance.
 	 *
 	 * @param jobVertexId JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex Key group index the KvState instance belongs to
+	 * @param keyGroupRange Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState has been registered
 	 * @param kvStateId ID of the registered KvState instance
 	 * @param kvStateServerAddress Server address where to find the KvState instance
@@ -85,7 +86,7 @@ public class KvStateLocationRegistry {
 	 */
 	public void notifyKvStateRegistered(
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvStateID kvStateId,
 			KvStateServerAddress kvStateServerAddress) {
@@ -97,7 +98,7 @@ public class KvStateLocationRegistry {
 			ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
 
 			if (vertex != null) {
-				int parallelism = vertex.getParallelism();
+				int parallelism = vertex.getMaxParallelism();
 				location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
 				lookupTable.put(registrationName, location);
 			} else {
@@ -119,22 +120,21 @@ public class KvStateLocationRegistry {
 
 			throw duplicate;
 		}
-
-		location.registerKvState(keyGroupIndex, kvStateId, kvStateServerAddress);
+		location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
 	}
 
 	/**
 	 * Notifies the registry about an unregistered KvState instance.
 	 *
 	 * @param jobVertexId JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex Key group index the KvState instance belongs to
+	 * @param keyGroupRange Key group index the KvState instance belongs to
 	 * @param registrationName Name under which the KvState has been registered
 	 * @throws IllegalArgumentException If another operator registered the state instance
 	 * @throws IllegalArgumentException If the registration name is not known
 	 */
 	public void notifyKvStateUnregistered(
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName) {
 
 		KvStateLocation location = lookupTable.get(registrationName);
@@ -147,7 +147,7 @@ public class KvStateLocationRegistry {
 						"under '" + registrationName + "'.");
 			}
 
-			location.unregisterKvState(keyGroupIndex);
+			location.unregisterKvState(keyGroupRange);
 
 			if (location.getNumRegisteredKeyGroups() == 0) {
 				lookupTable.remove(registrationName);

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
index 5e3c38e..857b8b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -97,8 +98,8 @@ public interface KvStateMessage extends Serializable {
 		/** JobVertexID the KvState instance belongs to. */
 		private final JobVertexID jobVertexId;
 
-		/** Key group index the KvState instance belongs to. */
-		private final int keyGroupIndex;
+		/** Key group range the KvState instance belongs to. */
+		private final KeyGroupRange keyGroupRange;
 
 		/** Name under which the KvState has been registered. */
 		private final String registrationName;
@@ -114,7 +115,7 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @param jobId                JobID the KvState instance belongs to
 		 * @param jobVertexId          JobVertexID the KvState instance belongs to
-		 * @param keyGroupIndex        Key group index the KvState instance belongs to
+		 * @param keyGroupRange        Key group range the KvState instance belongs to
 		 * @param registrationName     Name under which the KvState has been registered
 		 * @param kvStateId            ID of the registered KvState instance
 		 * @param kvStateServerAddress Server address where to find the KvState instance
@@ -122,15 +123,15 @@ public interface KvStateMessage extends Serializable {
 		public NotifyKvStateRegistered(
 				JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName,
 				KvStateID kvStateId,
 				KvStateServerAddress kvStateServerAddress) {
 
 			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
 			this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
-			Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
-			this.keyGroupIndex = keyGroupIndex;
+			Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP);
+			this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 			this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
 			this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
 			this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
@@ -159,8 +160,8 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @return Key group index the KvState instance belongs to
 		 */
-		public int getKeyGroupIndex() {
-			return keyGroupIndex;
+		public KeyGroupRange getKeyGroupRange() {
+			return keyGroupRange;
 		}
 
 		/**
@@ -195,7 +196,7 @@ public interface KvStateMessage extends Serializable {
 			return "NotifyKvStateRegistered{" +
 					"jobId=" + jobId +
 					", jobVertexId=" + jobVertexId +
-					", keyGroupIndex=" + keyGroupIndex +
+					", keyGroupRange=" + keyGroupRange +
 					", registrationName='" + registrationName + '\'' +
 					", kvStateId=" + kvStateId +
 					", kvStateServerAddress=" + kvStateServerAddress +
@@ -214,7 +215,7 @@ public interface KvStateMessage extends Serializable {
 		private final JobVertexID jobVertexId;
 
 		/** Key group index the KvState instance belongs to. */
-		private final int keyGroupIndex;
+		private final KeyGroupRange keyGroupRange;
 
 		/** Name under which the KvState has been registered. */
 		private final String registrationName;
@@ -224,19 +225,19 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @param jobId                JobID the KvState instance belongs to
 		 * @param jobVertexId          JobVertexID the KvState instance belongs to
-		 * @param keyGroupIndex        Key group index the KvState instance belongs to
+		 * @param keyGroupRange        Key group range the KvState instance belongs to
 		 * @param registrationName     Name under which the KvState has been registered
 		 */
 		public NotifyKvStateUnregistered(
 				JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName) {
 
 			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
 			this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
-			Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
-			this.keyGroupIndex = keyGroupIndex;
+			Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP);
+			this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 			this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
 		}
 
@@ -263,8 +264,8 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @return Key group index the KvState instance belongs to
 		 */
-		public int getKeyGroupIndex() {
-			return keyGroupIndex;
+		public KeyGroupRange getKeyGroupRange() {
+			return keyGroupRange;
 		}
 
 		/**
@@ -281,7 +282,7 @@ public interface KvStateMessage extends Serializable {
 			return "NotifyKvStateUnregistered{" +
 					"jobId=" + jobId +
 					", jobVertexId=" + jobVertexId +
-					", keyGroupIndex=" + keyGroupIndex +
+					", keyGroupRange=" + keyGroupRange +
 					", registrationName='" + registrationName + '\'' +
 					'}';
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index f19c123..f57ae47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.query;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.Task;
 
@@ -80,7 +81,7 @@ public class KvStateRegistry {
 	 *
 	 * @param jobId            JobId the KvState instance belongs to
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState is registered
 	 * @param kvState          KvState instance to be registered
 	 * @return Assigned KvStateID
@@ -88,7 +89,7 @@ public class KvStateRegistry {
 	public KvStateID registerKvState(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvState<?> kvState) {
 
@@ -100,7 +101,7 @@ public class KvStateRegistry {
 				listener.notifyKvStateRegistered(
 						jobId,
 						jobVertexId,
-						keyGroupIndex,
+						keyGroupRange,
 						registrationName,
 						kvStateId);
 			}
@@ -116,11 +117,12 @@ public class KvStateRegistry {
 	 *
 	 * @param jobId     JobId the KvState instance belongs to
 	 * @param kvStateId KvStateID to identify the KvState instance
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 */
 	public void unregisterKvState(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvStateID kvStateId) {
 
@@ -130,7 +132,7 @@ public class KvStateRegistry {
 				listener.notifyKvStateUnregistered(
 						jobId,
 						jobVertexId,
-						keyGroupIndex,
+						keyGroupRange,
 						registrationName);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
index 760adf1..29bee9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 
 /**
  * A listener for a {@link KvStateRegistry}.
@@ -34,14 +35,14 @@ public interface KvStateRegistryListener {
 	 *
 	 * @param jobId            Job ID the KvState instance belongs to
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState is registered
 	 * @param kvStateId        ID of the KvState instance
 	 */
 	void notifyKvStateRegistered(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvStateID kvStateId);
 
@@ -50,13 +51,13 @@ public interface KvStateRegistryListener {
 	 *
 	 * @param jobId            Job ID the KvState instance belongs to
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState is registered
 	 */
 	void notifyKvStateUnregistered(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 0e1ea57..591c67d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -296,7 +296,7 @@ public class QueryableStateClient {
 				.flatMap(new Mapper<KvStateLocation, Future<byte[]>>() {
 					@Override
 					public Future<byte[]> apply(KvStateLocation lookup) {
-						int keyGroupIndex = MathUtils.murmurHash(keyHashCode) % lookup.getNumKeyGroups();
+						int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups());
 
 						KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
 						if (serverAddress == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index b5c09aa..d831214 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -52,15 +53,15 @@ public class TaskKvStateRegistry {
 	/**
 	 * Registers the KvState instance at the KvStateRegistry.
 	 *
-	 * @param keyGroupIndex    KeyGroupIndex the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName The registration name (not necessarily the same
 	 *                         as the KvState name defined in the state
 	 *                         descriptor used to create the KvState instance)
 	 * @param kvState          The
 	 */
-	public void registerKvState(int keyGroupIndex, String registrationName, KvState<?> kvState) {
-		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupIndex, registrationName, kvState);
-		registeredKvStates.add(new KvStateInfo(keyGroupIndex, registrationName, kvStateId));
+	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, KvState<?> kvState) {
+		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
+		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
 	}
 
 	/**
@@ -68,7 +69,7 @@ public class TaskKvStateRegistry {
 	 */
 	public void unregisterAll() {
 		for (KvStateInfo kvState : registeredKvStates) {
-			registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupIndex, kvState.registrationName, kvState.kvStateId);
+			registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupRange, kvState.registrationName, kvState.kvStateId);
 		}
 	}
 
@@ -77,14 +78,14 @@ public class TaskKvStateRegistry {
 	 */
 	private static class KvStateInfo {
 
-		private final int keyGroupIndex;
+		private final KeyGroupRange keyGroupRange;
 
 		private final String registrationName;
 
 		private final KvStateID kvStateId;
 
-		public KvStateInfo(int keyGroupIndex, String registrationName, KvStateID kvStateId) {
-			this.keyGroupIndex = keyGroupIndex;
+		public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+			this.keyGroupRange = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index eceb6f4..894f721 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -48,8 +48,19 @@ public final class KeyGroupRangeAssignment {
 	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 	 * @return the key-group to which the given key is assigned
 	 */
-	public static final int assignToKeyGroup(Object key, int maxParallelism) {
-		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+	public static int assignToKeyGroup(Object key, int maxParallelism) {
+		return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
+	}
+
+	/**
+	 * Assigns the given key to a key-group index.
+	 *
+	 * @param keyHash the hash of the key to assign
+	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+	 * @return the key-group to which the given key is assigned
+	 */
+	public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
+		return MathUtils.murmurHash(keyHash) % maxParallelism;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 8db63ee..5612f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -269,8 +269,7 @@ public abstract class KeyedStateBackend<K> {
 			}
 
 			String name = stateDescriptor.getQueryableStateName();
-			// TODO: deal with key group indices here
-			kvStateRegistry.registerKvState(0, name, kvState);
+			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
 		}
 
 		return state;
@@ -336,4 +335,9 @@ public abstract class KeyedStateBackend<K> {
 			long checkpointId,
 			long timestamp,
 			CheckpointStreamFactory streamFactory) throws Exception;
+
+
+	public KeyGroupRange getKeyGroupRange() {
+		return keyGroupRange;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
index 2d69938..4404867 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -49,14 +50,14 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe
 	public void notifyKvStateRegistered(
 		JobID jobId,
 		JobVertexID jobVertexId,
-		int keyGroupIndex,
+		KeyGroupRange keyGroupRange,
 		String registrationName,
 		KvStateID kvStateId) {
 
 		Object msg = new KvStateMessage.NotifyKvStateRegistered(
 			jobId,
 			jobVertexId,
-			keyGroupIndex,
+			keyGroupRange,
 			registrationName,
 			kvStateId,
 			kvStateServerAddress);
@@ -68,13 +69,13 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe
 	public void notifyKvStateUnregistered(
 		JobID jobId,
 		JobVertexID jobVertexId,
-		int keyGroupIndex,
+		KeyGroupRange keyGroupRange,
 		String registrationName) {
 
 		Object msg = new KvStateMessage.NotifyKvStateUnregistered(
 			jobId,
 			jobVertexId,
-			keyGroupIndex,
+			keyGroupRange,
 			registrationName);
 
 		jobManager.tell(msg);

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 639c158..fd96f86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1500,7 +1500,7 @@ class JobManager(
 
               graph.getKvStateLocationRegistry.notifyKvStateRegistered(
                 msg.getJobVertexId,
-                msg.getKeyGroupIndex,
+                msg.getKeyGroupRange,
                 msg.getRegistrationName,
                 msg.getKvStateId,
                 msg.getKvStateServerAddress)
@@ -1519,7 +1519,7 @@ class JobManager(
             try {
               graph.getKvStateLocationRegistry.notifyKvStateUnregistered(
                 msg.getJobVertexId,
-                msg.getKeyGroupIndex,
+                msg.getKeyGroupRange,
                 msg.getRegistrationName)
             } catch {
               case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index f925d62..d731b95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
 import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -467,7 +468,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered registerNonExistingJob = new NotifyKvStateRegistered(
 				new JobID(),
 				new JobVertexID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"any-name",
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1233));
@@ -492,7 +493,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered registerForExistingJob = new NotifyKvStateRegistered(
 				jobGraph.getJobID(),
 				jobVertex1.getID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"register-me",
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
@@ -512,11 +513,12 @@ public class JobManagerTest {
 
 		assertEquals(jobGraph.getJobID(), location.getJobId());
 		assertEquals(jobVertex1.getID(), location.getJobVertexId());
-		assertEquals(jobVertex1.getParallelism(), location.getNumKeyGroups());
+		assertEquals(jobVertex1.getMaxParallelism(), location.getNumKeyGroups());
 		assertEquals(1, location.getNumRegisteredKeyGroups());
-		int keyGroupIndex = registerForExistingJob.getKeyGroupIndex();
-		assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupIndex));
-		assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupIndex));
+		KeyGroupRange keyGroupRange = registerForExistingJob.getKeyGroupRange();
+		assertEquals(1, keyGroupRange.getNumberOfKeyGroups());
+		assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupRange.getStartKeyGroup()));
+		assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
 
 		//
 		// Unregistration
@@ -524,7 +526,7 @@ public class JobManagerTest {
 		NotifyKvStateUnregistered unregister = new NotifyKvStateUnregistered(
 				registerForExistingJob.getJobId(),
 				registerForExistingJob.getJobVertexId(),
-				registerForExistingJob.getKeyGroupIndex(),
+				registerForExistingJob.getKeyGroupRange(),
 				registerForExistingJob.getRegistrationName());
 
 		jobManager.tell(unregister);
@@ -546,7 +548,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered register = new NotifyKvStateRegistered(
 				jobGraph.getJobID(),
 				jobVertex1.getID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"duplicate-me",
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
@@ -554,7 +556,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
 				jobGraph.getJobID(),
 				jobVertex2.getID(), // <--- different operator, but...
-				0,
+				new KeyGroupRange(0, 0),
 				"duplicate-me", // ...same name
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
index 70f0ba2..f8005a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.query;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -52,8 +53,8 @@ public class KvStateLocationRegistryTest {
 		// IDs for each key group of each vertex
 		KvStateID[][] ids = new KvStateID[vertices.length][];
 		for (int i = 0; i < ids.length; i++) {
-			ids[i] = new KvStateID[vertices[i].getParallelism()];
-			for (int j = 0; j < vertices[i].getParallelism(); j++) {
+			ids[i] = new KvStateID[vertices[i].getMaxParallelism()];
+			for (int j = 0; j < vertices[i].getMaxParallelism(); j++) {
 				ids[i][j] = new KvStateID();
 			}
 		}
@@ -66,12 +67,12 @@ public class KvStateLocationRegistryTest {
 
 		// Register
 		for (int i = 0; i < vertices.length; i++) {
-			int numKeyGroups = vertices[i].getParallelism();
+			int numKeyGroups = vertices[i].getMaxParallelism();
 			for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
 				// Register
 				registry.notifyKvStateRegistered(
 						vertices[i].getJobVertexId(),
-						keyGroupIndex,
+						new KeyGroupRange(keyGroupIndex, keyGroupIndex),
 						registrationNames[i],
 						ids[i][keyGroupIndex],
 						server);
@@ -83,8 +84,8 @@ public class KvStateLocationRegistryTest {
 			KvStateLocation location = registry.getKvStateLocation(registrationNames[i]);
 			assertNotNull(location);
 
-			int parallelism = vertices[i].getParallelism();
-			for (int keyGroupIndex = 0; keyGroupIndex < parallelism; keyGroupIndex++) {
+			int maxParallelism = vertices[i].getMaxParallelism();
+			for (int keyGroupIndex = 0; keyGroupIndex < maxParallelism; keyGroupIndex++) {
 				assertEquals(ids[i][keyGroupIndex], location.getKvStateID(keyGroupIndex));
 				assertEquals(server, location.getKvStateServerAddress(keyGroupIndex));
 			}
@@ -92,10 +93,10 @@ public class KvStateLocationRegistryTest {
 
 		// Unregister
 		for (int i = 0; i < vertices.length; i++) {
-			int numKeyGroups = vertices[i].getParallelism();
+			int numKeyGroups = vertices[i].getMaxParallelism();
 			JobVertexID jobVertexId = vertices[i].getJobVertexId();
 			for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-				registry.notifyKvStateUnregistered(jobVertexId, keyGroupIndex, registrationNames[i]);
+				registry.notifyKvStateUnregistered(jobVertexId, new KeyGroupRange(keyGroupIndex, keyGroupIndex), registrationNames[i]);
 			}
 		}
 
@@ -121,7 +122,7 @@ public class KvStateLocationRegistryTest {
 		// First operator registers
 		registry.notifyKvStateRegistered(
 				vertices[0].getJobVertexId(),
-				0,
+				new KeyGroupRange(0, 0),
 				registrationName,
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 12328));
@@ -130,7 +131,7 @@ public class KvStateLocationRegistryTest {
 			// Second operator registers same name
 			registry.notifyKvStateRegistered(
 					vertices[1].getJobVertexId(),
-					0,
+					new KeyGroupRange(0, 0),
 					registrationName,
 					new KvStateID(),
 					new KvStateServerAddress(InetAddress.getLocalHost(), 12032));
@@ -151,7 +152,7 @@ public class KvStateLocationRegistryTest {
 
 		KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
 		try {
-			registry.notifyKvStateUnregistered(vertex.getJobVertexId(), 0, "any-name");
+			registry.notifyKvStateUnregistered(vertex.getJobVertexId(), new KeyGroupRange(0, 0), "any-name");
 			fail("Did not throw expected Exception, because of missing registration");
 		} catch (IllegalArgumentException ignored) {
 			// Expected
@@ -179,7 +180,7 @@ public class KvStateLocationRegistryTest {
 		// First operator registers name
 		registry.notifyKvStateRegistered(
 				vertices[0].getJobVertexId(),
-				0,
+				new KeyGroupRange(0, 0),
 				name,
 				new KvStateID(),
 				mock(KvStateServerAddress.class));
@@ -190,7 +191,7 @@ public class KvStateLocationRegistryTest {
 
 			registry.notifyKvStateUnregistered(
 					vertices[0].getJobVertexId(),
-					notRegisteredKeyGroupIndex,
+					new KeyGroupRange(notRegisteredKeyGroupIndex, notRegisteredKeyGroupIndex),
 					name);
 
 			fail("Did not throw expected Exception");
@@ -201,7 +202,7 @@ public class KvStateLocationRegistryTest {
 			// Wrong operator tries to unregister
 			registry.notifyKvStateUnregistered(
 					vertices[1].getJobVertexId(),
-					0,
+					new KeyGroupRange(0, 0),
 					name);
 
 			fail("Did not throw expected Exception");
@@ -210,13 +211,13 @@ public class KvStateLocationRegistryTest {
 	}
 
 	// ------------------------------------------------------------------------
-	
-	private ExecutionJobVertex createJobVertex(int parallelism) {
+
+	private ExecutionJobVertex createJobVertex(int maxParallelism) {
 		JobVertexID id = new JobVertexID();
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 
 		when(vertex.getJobVertexId()).thenReturn(id);
-		when(vertex.getParallelism()).thenReturn(parallelism);
+		when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
 
 		return vertex;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
index 59ac575..ed51f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -36,55 +39,87 @@ public class KvStateLocationTest {
 		JobID jobId = new JobID();
 		JobVertexID jobVertexId = new JobVertexID();
 		int numKeyGroups = 123;
+		int numRanges = 10;
+		int fract = numKeyGroups / numRanges;
+		int remain = numKeyGroups % numRanges;
+		List<KeyGroupRange> keyGroupRanges = new ArrayList<>(numRanges);
+
+		int start = 0;
+		for (int i = 0; i < numRanges; ++i) {
+			int end = start + fract - 1;
+			if(remain > 0) {
+				--remain;
+				++end;
+			}
+ 			KeyGroupRange range = new KeyGroupRange(start, end);
+			keyGroupRanges.add(range);
+			start = end + 1;
+		}
+
+		System.out.println(keyGroupRanges);
+
 		String registrationName = "asdasdasdasd";
 
 		KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);
 
-		KvStateID[] kvStateIds = new KvStateID[numKeyGroups];
-		KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numKeyGroups];
+		KvStateID[] kvStateIds = new KvStateID[numRanges];
+		KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numRanges];
 
 		InetAddress host = InetAddress.getLocalHost();
 
 		// Register
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			kvStateIds[keyGroupIndex] = new KvStateID();
-			serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
-
-			location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
-			assertEquals(keyGroupIndex + 1, location.getNumRegisteredKeyGroups());
+		int registeredCount = 0;
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			kvStateIds[rangeIdx] = new KvStateID();
+			serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			location.registerKvState(keyGroupRange, kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
+			registeredCount += keyGroupRange.getNumberOfKeyGroups();
+			assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
 		}
 
 		// Lookup
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
-			assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+				assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup));
+				assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup));
+			}
 		}
 
 		// Overwrite
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			kvStateIds[keyGroupIndex] = new KvStateID();
-			serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			kvStateIds[rangeIdx] = new KvStateID();
+			serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
 
-			location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
-			assertEquals(numKeyGroups, location.getNumRegisteredKeyGroups());
+			location.registerKvState(keyGroupRanges.get(rangeIdx), kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
+			assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
 		}
 
 		// Lookup
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
-			assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+				assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup));
+				assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup));
+			}
 		}
 
 		// Unregister
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			location.unregisterKvState(keyGroupIndex);
-			assertEquals(numKeyGroups - keyGroupIndex - 1, location.getNumRegisteredKeyGroups());
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			location.unregisterKvState(keyGroupRange);
+			registeredCount -= keyGroupRange.getNumberOfKeyGroups();
+			assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
 		}
 
 		// Lookup
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			assertEquals(null, location.getKvStateID(keyGroupIndex));
-			assertEquals(null, location.getKvStateServerAddress(keyGroupIndex));
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+				assertEquals(null, location.getKvStateID(keyGroup));
+				assertEquals(null, location.getKvStateServerAddress(keyGroup));
+			}
 		}
 
 		assertEquals(0, location.getNumRegisteredKeyGroups());

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 405f962..1039568 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -154,7 +154,7 @@ public class QueryableStateClientTest {
 			KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
 			KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3);
 			for (int i = 0; i < numKeyGroups; i++) {
-				location.registerKvState(i, kvStateId, serverAddress);
+				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
 			}
 
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
@@ -184,7 +184,7 @@ public class QueryableStateClientTest {
 			serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
 			location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
 			for (int i = 0; i < numKeyGroups; i++) {
-				location.registerKvState(i, kvStateId, serverAddress);
+				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
 			}
 
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
@@ -281,7 +281,7 @@ public class QueryableStateClientTest {
 				kvStateIds[i] = registries[i].registerKvState(
 						jobId,
 						new JobVertexID(),
-						i, // key group index
+						new KeyGroupRange(i, i),
 						"choco",
 						kvState);
 			}
@@ -302,7 +302,7 @@ public class QueryableStateClientTest {
 			// Location lookup service
 			KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
 			for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
-				location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
+				location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
 			}
 
 			KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
@@ -385,7 +385,7 @@ public class QueryableStateClientTest {
 
 		// Exact contents don't matter here
 		KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
-		location.registerKvState(0, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
+		location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
 
 		JobID jobId1 = new JobID();
 		JobID jobId2 = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index f785174..c8fb4bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -595,7 +595,7 @@ public class KvStateClientTest {
 				KvState<?> kvState = (KvState<?>) state;
 
 				// Register KvState (one state instance for all server)
-				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), 0, "any", kvState);
+				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
 			}
 
 			final KvStateClient finalClient = client;

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 52c807f..7e6d713 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -278,7 +278,7 @@ public class KvStateServerHandlerTest {
 		KvStateID kvStateId = registry.registerKvState(
 				new JobID(),
 				new JobVertexID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"vanilla",
 				kvState);
 
@@ -681,18 +681,18 @@ public class KvStateServerHandlerTest {
 	 */
 	static class TestRegistryListener implements KvStateRegistryListener {
 		volatile JobVertexID jobVertexID;
-		volatile int keyGroupIndex;
+		volatile KeyGroupRange keyGroupIndex;
 		volatile String registrationName;
 		volatile KvStateID kvStateId;
 
 		@Override
 		public void notifyKvStateRegistered(JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName,
 				KvStateID kvStateId) {
 			this.jobVertexID = jobVertexId;
-			this.keyGroupIndex = keyGroupIndex;
+			this.keyGroupIndex = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;
 		}
@@ -700,7 +700,7 @@ public class KvStateServerHandlerTest {
 		@Override
 		public void notifyKvStateUnregistered(JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName) {
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 33ec182..73e2808 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1108,6 +1108,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+		KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
 
 		KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
 		registry.registerListener(listener);
@@ -1122,7 +1123,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		// Verify registered
 		verify(listener, times(1)).notifyKvStateRegistered(
-				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
 		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
@@ -1130,7 +1131,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		backend.close();
 
 		verify(listener, times(1)).notifyKvStateUnregistered(
-				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"));
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"));
 		backend.close();
 		// Initialize again
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
@@ -1140,12 +1141,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		// Verify registered again
 		verify(listener, times(2)).notifyKvStateRegistered(
-				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 		backend.close();
 
 	}
-	
+
 	@Test
 	public void testEmptyStateCheckpointing() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index b99858a..1259460 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -93,7 +93,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@Ignore
 public class QueryableStateITCase extends TestLogger {
 
 	private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);