You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:40 UTC

[22/27] flink git commit: [FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have default max. parallelism at 128

[FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have default max. parallelism at 128


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d430618
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d430618
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d430618

Branch: refs/heads/master
Commit: 6d4306186e09be6f2557600ed7a853c33d3ae6bd
Parents: 2b7a8d6
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Aug 29 11:53:22 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:10:01 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   3 +-
 .../state/RocksDBKeyedStateBackend.java         |   9 +-
 .../streaming/state/RocksDBStateBackend.java    |   9 +-
 .../state/RocksDBStateBackendConfigTest.java    |   9 +-
 .../api/common/state/KeyGroupAssigner.java      |  53 -----
 .../java/org/apache/flink/util/MathUtils.java   |  15 ++
 .../org/apache/flink/util/MathUtilTest.java     |  31 +++
 .../checkpoint/CheckpointCoordinator.java       |   3 +-
 .../flink/runtime/jobgraph/JobVertex.java       |   3 +-
 .../runtime/state/AbstractStateBackend.java     |   5 +-
 .../runtime/state/HashKeyGroupAssigner.java     |  66 ------
 .../flink/runtime/state/KeyGroupRange.java      |  42 ----
 .../runtime/state/KeyGroupRangeAssignment.java  |  97 +++++++++
 .../flink/runtime/state/KeyedStateBackend.java  |  17 +-
 .../state/filesystem/FsStateBackend.java        |   9 +-
 .../runtime/state/heap/AbstractHeapState.java   |   4 +-
 .../state/heap/HeapKeyedStateBackend.java       |   9 +-
 .../flink/runtime/state/heap/HeapListState.java |   3 +-
 .../state/memory/MemoryStateBackend.java        |   9 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   3 +-
 .../runtime/query/QueryableStateClientTest.java |   4 +-
 .../runtime/query/netty/KvStateClientTest.java  |   5 +-
 .../query/netty/KvStateServerHandlerTest.java   |  16 +-
 .../runtime/query/netty/KvStateServerTest.java  |   6 +-
 .../runtime/state/StateBackendTestBase.java     |  49 ++---
 .../streaming/api/datastream/KeyedStream.java   |   5 +-
 .../flink/streaming/api/graph/StreamConfig.java |  22 +-
 .../api/graph/StreamGraphGenerator.java         |  24 ++-
 .../api/graph/StreamingJobGraphGenerator.java   |   9 +-
 .../api/operators/AbstractStreamOperator.java   |   5 +-
 .../partitioner/KeyGroupStreamPartitioner.java  |  24 +--
 .../streaming/runtime/tasks/StreamTask.java     |   7 +-
 .../api/graph/StreamGraphGeneratorTest.java     |  26 +--
 .../graph/StreamingJobGraphGeneratorTest.java   | 200 -------------------
 .../operators/StreamingRuntimeContextTest.java  |   5 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  19 +-
 .../KeyGroupStreamPartitionerTest.java          |   3 +-
 .../tasks/OneInputStreamTaskTestHarness.java    |   3 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |  16 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 -
 .../test/checkpointing/RescalingITCase.java     |  27 +--
 .../streaming/runtime/StateBackendITCase.java   |   5 +-
 42 files changed, 297 insertions(+), 586 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index cbc2757..e878ad5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
@@ -130,7 +131,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 				backend.getKeySerializer(),
 				namespaceSerializer);
 
-		int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
+		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
 		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
 		return backend.db.get(columnFamily, keySerializationStream.toByteArray());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a1634b2..177c09f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,7 +21,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -131,11 +130,11 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			ColumnFamilyOptions columnFamilyOptions,
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange
 	) throws Exception {
 
-		super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+		super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
 
 		this.operatorIdentifier = operatorIdentifier;
 		this.jobId = jobId;
@@ -183,7 +182,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			ColumnFamilyOptions columnFamilyOptions,
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoreState
 	) throws Exception {
@@ -195,7 +194,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			columnFamilyOptions,
 			kvStateRegistry,
 			keySerializer,
-			keyGroupAssigner,
+			numberOfKeyGroups,
 			keyGroupRange);
 
 		LOG.info("Initializing RocksDB keyed state backend from snapshot.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f950751..0fdbd5f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -18,7 +18,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
@@ -230,7 +229,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 
@@ -246,7 +245,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				getColumnOptions(),
 				kvStateRegistry,
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange);
 	}
 
@@ -254,7 +253,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -270,7 +269,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				getColumnOptions(),
 				kvStateRegistry,
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange,
 				restoredState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index acf6cb8..3b851be 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
@@ -93,7 +92,7 @@ public class RocksDBStateBackendConfigTest {
 				env.getJobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				1,
 				new KeyGroupRange(0, 0),
 				env.getTaskKvStateRegistry());
 
@@ -147,7 +146,7 @@ public class RocksDBStateBackendConfigTest {
 				env.getJobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				1,
 				new KeyGroupRange(0, 0),
 				env.getTaskKvStateRegistry());
 
@@ -182,7 +181,7 @@ public class RocksDBStateBackendConfigTest {
 						env.getJobID(),
 						"foobar",
 						IntSerializer.INSTANCE,
-						new HashKeyGroupAssigner<Integer>(1),
+						1,
 						new KeyGroupRange(0, 0),
 						new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
 			}
@@ -224,7 +223,7 @@ public class RocksDBStateBackendConfigTest {
 						env.getJobID(),
 						"foobar",
 						IntSerializer.INSTANCE,
-						new HashKeyGroupAssigner<Integer>(1),
+						1,
 						new KeyGroupRange(0, 0),
 						new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
deleted file mode 100644
index bb0691e..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-
-/**
- * Assigns a key to a key group index. A key group is the smallest unit of partitioned state
- * which is assigned to an operator. An operator can be assigned multiple key groups.
- *
- * @param <K> Type of the key
- */
-@Internal
-public interface KeyGroupAssigner<K> extends Serializable {
-	/**
-	 * Calculates the key group index for the given key.
-	 *
-	 * @param key Key to be used
-	 * @return Key group index for the given key
-	 */
-	int getKeyGroupIndex(K key);
-
-	/**
-	 * Setups the key group assigner with the maximum parallelism (= number of key groups).
-	 *
-	 * @param numberOfKeygroups Maximum parallelism (= number of key groups)
-	 */
-	void setup(int numberOfKeygroups);
-
-	/**
-	 *
-	 * @return configured maximum parallelism
-	 */
-	int getNumberKeyGroups();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index f40c83a..49056cc 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -155,6 +155,21 @@ public final class MathUtils {
 		}
 	}
 
+	/**
+	 * Round the given number to the next power of two
+	 * @param x number to round
+	 * @return x rounded up to the next power of two
+	 */
+	public static int roundUpToPowerOfTwo(int x) {
+		x = x - 1;
+		x |= x >> 1;
+		x |= x >> 2;
+		x |= x >> 4;
+		x |= x >> 8;
+		x |= x >> 16;
+		return x + 1;
+	}
+
 	// ============================================================================================
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
index 7917a7b..c98b7fc 100644
--- a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -81,6 +81,37 @@ public class MathUtilTest {
 	}
 
 	@Test
+	public void testRoundUpToPowerOf2() {
+		assertEquals(0, MathUtils.roundUpToPowerOfTwo(0));
+		assertEquals(1, MathUtils.roundUpToPowerOfTwo(1));
+		assertEquals(2, MathUtils.roundUpToPowerOfTwo(2));
+		assertEquals(4, MathUtils.roundUpToPowerOfTwo(3));
+		assertEquals(4, MathUtils.roundUpToPowerOfTwo(4));
+		assertEquals(8, MathUtils.roundUpToPowerOfTwo(5));
+		assertEquals(8, MathUtils.roundUpToPowerOfTwo(6));
+		assertEquals(8, MathUtils.roundUpToPowerOfTwo(7));
+		assertEquals(8, MathUtils.roundUpToPowerOfTwo(8));
+		assertEquals(16, MathUtils.roundUpToPowerOfTwo(9));
+		assertEquals(16, MathUtils.roundUpToPowerOfTwo(15));
+		assertEquals(16, MathUtils.roundUpToPowerOfTwo(16));
+		assertEquals(32, MathUtils.roundUpToPowerOfTwo(17));
+		assertEquals(32, MathUtils.roundUpToPowerOfTwo(31));
+		assertEquals(32, MathUtils.roundUpToPowerOfTwo(32));
+		assertEquals(64, MathUtils.roundUpToPowerOfTwo(33));
+		assertEquals(64, MathUtils.roundUpToPowerOfTwo(42));
+		assertEquals(64, MathUtils.roundUpToPowerOfTwo(63));
+		assertEquals(64, MathUtils.roundUpToPowerOfTwo(64));
+		assertEquals(128, MathUtils.roundUpToPowerOfTwo(125));
+		assertEquals(32768, MathUtils.roundUpToPowerOfTwo(25654));
+		assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(34366363));
+		assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108863));
+		assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108864));
+		assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFE));
+		assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFF));
+		assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x40000000));
+	}
+
+	@Test
 	public void testPowerOfTwo() {
 		assertTrue(MathUtils.isPowerOf2(1));
 		assertTrue(MathUtils.isPowerOf2(2));

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e751e08..52f6d9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
@@ -886,7 +887,7 @@ public class CheckpointCoordinator {
 		List<KeyGroupRange> result = new ArrayList<>(parallelism);
 		int start = 0;
 		for (int i = 0; i < parallelism; ++i) {
-			result.add(KeyGroupRange.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
+			result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
 		}
 		return result;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index a623295..8ddc9f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -253,7 +253,8 @@ public class JobVertex implements java.io.Serializable {
 	 */
 	public void setMaxParallelism(int maxParallelism) {
 		org.apache.flink.util.Preconditions.checkArgument(
-				maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");
+				maxParallelism > 0 && maxParallelism <= (1 << 15),
+				"The max parallelism must be at least 1 and smaller than 2^15.");
 
 		this.maxParallelism = maxParallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index e6093a8..0d2bf45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -53,7 +52,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws Exception;
 
@@ -66,7 +65,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
deleted file mode 100644
index 9ee4b90..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.KeyGroupAssigner;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Hash based key group assigner. The assigner assigns each key to a key group using the hash value
- * of the key.
- *
- * @param <K> Type of the key
- */
-public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
-	private static final long serialVersionUID = -6319826921798945448L;
-
-	private static final int UNDEFINED_NUMBER_KEY_GROUPS = Integer.MIN_VALUE;
-
-	private int numberKeyGroups;
-
-	public HashKeyGroupAssigner() {
-		this(UNDEFINED_NUMBER_KEY_GROUPS);
-	}
-
-	public HashKeyGroupAssigner(int numberKeyGroups) {
-		Preconditions.checkArgument(numberKeyGroups > 0 || numberKeyGroups == UNDEFINED_NUMBER_KEY_GROUPS,
-			"The number of key groups has to be greater than 0 or undefined. Use " +
-			"setMaxParallelism() to specify the number of key groups.");
-		this.numberKeyGroups = numberKeyGroups;
-	}
-
-	public int getNumberKeyGroups() {
-		return numberKeyGroups;
-	}
-
-	@Override
-	public int getKeyGroupIndex(K key) {
-		return MathUtils.murmurHash(key.hashCode()) % numberKeyGroups;
-	}
-
-	@Override
-	public void setup(int numberOfKeygroups) {
-		Preconditions.checkArgument(numberOfKeygroups > 0, "The number of key groups has to be " +
-			"greater than 0. Use setMaxParallelism() to specify the number of key " +
-			"groups.");
-
-		this.numberKeyGroups = numberOfKeygroups;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
index 9e74036..3a9d3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -175,46 +175,4 @@ public class KeyGroupRange implements Iterable<Integer>, Serializable {
 		return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP;
 	}
 
-	/**
-	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
-	 * parallelism.
-	 *
-	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
-	 * to go beyond this boundary, this method must perform arithmetic on long values.
-	 *
-	 * @param maxParallelism Maximal parallelism that the job was initially created with.
-	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
-	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
-	 * @return
-	 */
-	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
-			int maxParallelism,
-			int parallelism,
-			int operatorIndex) {
-		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
-		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
-		Preconditions.checkArgument(maxParallelism <= Short.MAX_VALUE, "Maximum parallelism must be smaller than Short.MAX_VALUE.");
-
-		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
-		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
-		return new KeyGroupRange(start, end);
-	}
-
-	/**
-	 * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
-	 * parallelism.
-	 *
-	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
-	 * to go beyond this boundary, this method must perform arithmetic on long values.
-	 *
-	 * @param maxParallelism Maximal parallelism that the job was initially created with.
-	 *                       0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
-	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
-	 * @param keyGroupId     Id of a key-group. 0 <= keyGroupID < maxParallelism.
-	 * @return The index of the operator to which elements from the given key-group should be routed under the given
-	 * parallelism and maxParallelism.
-	 */
-	public static final int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
-		return keyGroupId * parallelism / maxParallelism;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
new file mode 100644
index 0000000..eceb6f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+	public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+	private KeyGroupRangeAssignment() {
+		throw new AssertionError();
+	}
+
+	/**
+	 * Assigns the given key to a parallel operator index.
+	 *
+	 * @param key the key to assign
+	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+	 * @param parallelism the current parallelism of the operator
+	 * @return the index of the parallel operator to which the given key should be routed.
+	 */
+	public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
+		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
+	}
+
+	/**
+	 * Assigns the given key to a key-group index.
+	 *
+	 * @param key the key to assign
+	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+	 * @return the key-group to which the given key is assigned
+	 */
+	public static final int assignToKeyGroup(Object key, int maxParallelism) {
+		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+	}
+
+	/**
+	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
+	 * parallelism.
+	 *
+	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+	 * to go beyond this boundary, this method must perform arithmetic on long values.
+	 *
+	 * @param maxParallelism Maximal parallelism that the job was initially created with.
+	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
+	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
+	 * @return
+	 */
+	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+			int maxParallelism,
+			int parallelism,
+			int operatorIndex) {
+		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
+		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
+		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
+
+		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
+		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
+		return new KeyGroupRange(start, end);
+	}
+
+	/**
+	 * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
+	 * parallelism.
+	 *
+	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+	 * to go beyond this boundary, this method must perform arithmetic on long values.
+	 *
+	 * @param maxParallelism Maximal parallelism that the job was initially created with.
+	 *                       0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
+	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
+	 * @param keyGroupId     Id of a key-group. 0 <= keyGroupID < maxParallelism.
+	 * @return The index of the operator to which elements from the given key-group should be routed under the given
+	 * parallelism and maxParallelism.
+	 */
+	public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
+		return keyGroupId * parallelism / maxParallelism;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 2d1d25c..bf9018e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
@@ -69,8 +68,8 @@ public abstract class KeyedStateBackend<K> {
 	@SuppressWarnings("rawtypes")
 	private KvState lastState;
 
-	/** KeyGroupAssigner which determines the key group for each keys */
-	protected final KeyGroupAssigner<K> keyGroupAssigner;
+	/** The number of key-groups aka max parallelism */
+	protected final int numberOfKeyGroups;
 
 	/** Range of key-groups for which this backend is responsible */
 	protected final KeyGroupRange keyGroupRange;
@@ -81,12 +80,12 @@ public abstract class KeyedStateBackend<K> {
 	public KeyedStateBackend(
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange) {
 
 		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
-		this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 	}
 
@@ -157,7 +156,7 @@ public abstract class KeyedStateBackend<K> {
 	 */
 	public void setCurrentKey(K newKey) {
 		this.currentKey = newKey;
-		this.currentKeyGroup = keyGroupAssigner.getKeyGroupIndex(newKey);
+		this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
 	}
 
 	/**
@@ -179,11 +178,7 @@ public abstract class KeyedStateBackend<K> {
 	}
 
 	public int getNumberOfKeyGroups() {
-		return keyGroupAssigner.getNumberKeyGroups();
-	}
-
-	public KeyGroupAssigner<K> getKeyGroupAssigner() {
-		return keyGroupAssigner;
+		return numberOfKeyGroups;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 5495244..6d92a4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -181,13 +180,13 @@ public class FsStateBackend extends AbstractStateBackend {
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange);
 	}
 
@@ -197,14 +196,14 @@ public class FsStateBackend extends AbstractStateBackend {
 			JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange,
 				restoredState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 9863c93..18d1bc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
@@ -138,7 +138,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
 		Preconditions.checkState(key != null, "No key given.");
 
 		Map<N, Map<K, SV>> namespaceMap =
-				stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
 
 		if (namespaceMap == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index fcb4bef..8d13941 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -76,20 +75,20 @@ public class HeapKeyedStateBackend<K> extends KeyedStateBackend<K> {
 	public HeapKeyedStateBackend(
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange) {
 
-		super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+		super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
 
 		LOG.info("Initializing heap keyed state backend with stream factory.");
 	}
 
 	public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState) throws Exception {
-		super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+		super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
 
 		LOG.info("Initializing heap keyed state backend from snapshot.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 4c65c25..9552325 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.util.Preconditions;
 
@@ -119,7 +120,7 @@ public class HeapListState<K, N, V>
 		Preconditions.checkState(key != null, "No key given.");
 
 		Map<N, Map<K, ArrayList<V>>> namespaceMap =
-				stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
 
 		if (namespaceMap == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 654c367..179dfe7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -80,14 +79,14 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	public <K> KeyedStateBackend<K> createKeyedStateBackend(
 			Environment env, JobID jobID,
 			String operatorIdentifier, TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			TaskKvStateRegistry kvStateRegistry) throws IOException {
 
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange);
 	}
 
@@ -96,7 +95,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 			Environment env, JobID jobID,
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -104,7 +103,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange,
 				restoredState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 495dced..4972c51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -2458,7 +2459,7 @@ public class CheckpointCoordinatorTest {
 	private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
 		List<KeyGroupRange> ranges = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism);
 		for (int i = 0; i < maxParallelism; ++i) {
-			KeyGroupRange range = ranges.get(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, i));
+			KeyGroupRange range = ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, i));
 			if (!range.contains(i)) {
 				Assert.fail("Could not find expected key-group " + i + " in range " + range);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 3380907..405f962 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -232,6 +231,7 @@ public class QueryableStateClientTest {
 		// Config
 		int numServers = 2;
 		int numKeys = 1024;
+		int numKeyGroups = 1;
 
 		JobID jobId = new JobID();
 		JobVertexID jobVertexId = new JobVertexID();
@@ -250,7 +250,7 @@ public class QueryableStateClientTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 796481c..f785174 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequest;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
@@ -533,6 +532,8 @@ public class KvStateClientTest {
 
 		final int batchSize = 16;
 
+		final int numKeyGroups = 1;
+
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		KvStateRegistry dummyRegistry = new KvStateRegistry();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
@@ -542,7 +543,7 @@ public class KvStateClientTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 3d2e8b5..52c807f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KvState;
@@ -89,6 +88,7 @@ public class KvStateServerHandlerTest {
 		ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
 		desc.setQueryable("vanilla");
 
+		int numKeyGroups =1;
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
@@ -97,7 +97,7 @@ public class KvStateServerHandlerTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -200,6 +200,7 @@ public class KvStateServerHandlerTest {
 		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
+		int numKeyGroups = 1;
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
@@ -208,7 +209,7 @@ public class KvStateServerHandlerTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -346,6 +347,7 @@ public class KvStateServerHandlerTest {
 		KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
+		int numKeyGroups = 1;
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
@@ -354,7 +356,7 @@ public class KvStateServerHandlerTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -484,6 +486,7 @@ public class KvStateServerHandlerTest {
 		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
+		int numKeyGroups = 1;
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
@@ -492,7 +495,7 @@ public class KvStateServerHandlerTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 
@@ -579,6 +582,7 @@ public class KvStateServerHandlerTest {
 		KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
 		EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
 
+		int numKeyGroups = 1;
 		AbstractStateBackend abstractBackend = new MemoryStateBackend();
 		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 		dummyEnv.setKvStateRegistry(registry);
@@ -587,7 +591,7 @@ public class KvStateServerHandlerTest {
 				new JobID(),
 				"test_op",
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numKeyGroups,
 				new KeyGroupRange(0, 0),
 				registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
index 30d91b6..e92fb10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -80,7 +79,6 @@ public class KvStateServerTest {
 	public void testSimpleRequest() throws Exception {
 		KvStateServer server = null;
 		Bootstrap bootstrap = null;
-
 		try {
 			KvStateRegistry registry = new KvStateRegistry();
 			KvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -89,7 +87,7 @@ public class KvStateServerTest {
 			server.start();
 
 			KvStateServerAddress serverAddress = server.getAddress();
-
+			int numKeyGroups = 1;
 			AbstractStateBackend abstractBackend = new MemoryStateBackend();
 			DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
 			dummyEnv.setKvStateRegistry(registry);
@@ -98,7 +96,7 @@ public class KvStateServerTest {
 					new JobID(),
 					"test_op",
 					IntSerializer.INSTANCE,
-					new HashKeyGroupAssigner<Integer>(1),
+					numKeyGroups,
 					new KeyGroupRange(0, 0),
 					registry.createTaskRegistry(new JobID(), new JobVertexID()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f094bd5..5984aca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -41,7 +40,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -56,7 +54,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 
 import static org.junit.Assert.assertEquals;
@@ -90,14 +87,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	protected <K> KeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
 		return createKeyedBackend(
 				keySerializer,
-				new HashKeyGroupAssigner<K>(10),
+				10,
 				new KeyGroupRange(0, 9),
 				env);
 	}
 
 	protected <K> KeyedStateBackend<K> createKeyedBackend(
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			Environment env) throws Exception {
 		return getStateBackend().createKeyedStateBackend(
@@ -105,7 +102,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				new JobID(),
 				"test_op",
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange,
 				env.getTaskKvStateRegistry());
 	}
@@ -120,7 +117,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			Environment env) throws Exception {
 		return restoreKeyedBackend(
 				keySerializer,
-				new HashKeyGroupAssigner<K>(10),
+				10,
 				new KeyGroupRange(0, 9),
 				Collections.singletonList(state),
 				env);
@@ -128,7 +125,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 	protected <K> KeyedStateBackend<K> restoreKeyedBackend(
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> state,
 			Environment env) throws Exception {
@@ -137,7 +134,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 				new JobID(),
 				"test_op",
 				keySerializer,
-				keyGroupAssigner,
+				numberOfKeyGroups,
 				keyGroupRange,
 				state,
 				env.getTaskKvStateRegistry());
@@ -243,7 +240,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		KeyedStateBackend<Integer> backend = createKeyedBackend(
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				1,
 				new KeyGroupRange(0, 0),
 				new DummyEnvironment("test_op", 1, 0));
 
@@ -277,7 +274,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		backend.close();
 		backend = restoreKeyedBackend(
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				1,
 				new KeyGroupRange(0, 0),
 				Collections.singletonList(snapshot1),
 				new DummyEnvironment("test_op", 1, 0));
@@ -675,12 +672,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		final int MAX_PARALLELISM = 10;
 
 		CheckpointStreamFactory streamFactory = createStreamFactory();
-
-		HashKeyGroupAssigner<Integer> keyGroupAssigner = new HashKeyGroupAssigner<>(10);
-
 		KeyedStateBackend<Integer> backend = createKeyedBackend(
 				IntSerializer.INSTANCE,
-				keyGroupAssigner,
+				MAX_PARALLELISM,
 				new KeyGroupRange(0, MAX_PARALLELISM - 1),
 				new DummyEnvironment("test", 1, 0));
 
@@ -695,12 +689,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		Random rand = new Random(0);
 
 		// for each key, determine into which half of the key-group space they fall
-		int firstKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInFirstHalf) * 2 / MAX_PARALLELISM;
-		int secondKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM;
+		int firstKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, MAX_PARALLELISM, 2);
+		int secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, MAX_PARALLELISM, 2);
 
 		while (firstKeyHalf == secondKeyHalf) {
 			keyInSecondHalf = rand.nextInt();
-			secondKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM;
+			secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInSecondHalf, MAX_PARALLELISM, 2);
 		}
 
 		backend.setCurrentKey(keyInFirstHalf);
@@ -714,18 +708,18 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		List<KeyGroupsStateHandle> firstHalfKeyGroupStates = CheckpointCoordinator.getKeyGroupsStateHandles(
 				Collections.singletonList(snapshot),
-				new KeyGroupRange(0, 4));
+				KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 0));
 
 		List<KeyGroupsStateHandle> secondHalfKeyGroupStates = CheckpointCoordinator.getKeyGroupsStateHandles(
 				Collections.singletonList(snapshot),
-				new KeyGroupRange(5, 9));
+				KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 1));
 
 		backend.close();
 
 		// backend for the first half of the key group range
 		KeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend(
 				IntSerializer.INSTANCE,
-				keyGroupAssigner,
+				MAX_PARALLELISM,
 				new KeyGroupRange(0, 4),
 				firstHalfKeyGroupStates,
 				new DummyEnvironment("test", 1, 0));
@@ -733,7 +727,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		// backend for the second half of the key group range
 		KeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend(
 				IntSerializer.INSTANCE,
-				keyGroupAssigner,
+				MAX_PARALLELISM,
 				new KeyGroupRange(5, 9),
 				secondHalfKeyGroupStates,
 				new DummyEnvironment("test", 1, 0));
@@ -978,9 +972,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 	 */
 	@SuppressWarnings("unchecked")
 	protected void testConcurrentMapIfQueryable() throws Exception {
+		final int numberOfKeyGroups = 1;
 		KeyedStateBackend<Integer> backend = createKeyedBackend(
 				IntSerializer.INSTANCE,
-				new HashKeyGroupAssigner<Integer>(1),
+				numberOfKeyGroups,
 				new KeyGroupRange(0, 0),
 				new DummyEnvironment("test_op", 1, 0));
 
@@ -1005,7 +1000,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			backend.setCurrentKey(1);
 			state.update(121818273);
 
-			int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
 			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
 			assertNotNull("State not set", stateTable.get(keyGroupIndex));
 			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
@@ -1031,7 +1026,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
 			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
 			assertNotNull("State not set", stateTable.get(keyGroupIndex));
 			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
@@ -1062,7 +1057,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
 			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
 			assertNotNull("State not set", stateTable.get(keyGroupIndex));
 			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
@@ -1093,7 +1088,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 			backend.setCurrentKey(1);
 			state.add(121818273);
 
-			int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+			int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
 			StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
 			assertNotNull("State not set", stateTable.get(keyGroupIndex));
 			assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1fb34b8..af907e3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -111,8 +111,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 			new PartitionTransformation<>(
 				dataStream.getTransformation(),
 				new KeyGroupStreamPartitioner<>(
-					keySelector,
-					new HashKeyGroupAssigner<KEY>())));
+					keySelector, KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM)));
 		this.keySelector = keySelector;
 		this.keyType = keyType;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 1a92ba4..8e807db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
@@ -76,8 +75,7 @@ public class StreamConfig implements Serializable {
 	private static final String STATE_BACKEND = "statebackend";
 	private static final String STATE_PARTITIONER = "statePartitioner";
 
-	/** key for the {@link KeyGroupAssigner} for key to key group index mappings */
-	private static final String KEY_GROUP_ASSIGNER = "keyGroupAssigner";
+	private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups";
 
 	private static final String STATE_KEY_SERIALIZER = "statekeyser";
 	
@@ -445,28 +443,26 @@ public class StreamConfig implements Serializable {
 	}
 
 	/**
-	 * Sets the {@link KeyGroupAssigner} to be used for the current {@link StreamOperator}.
+	 * Sets the number of key-groups to be used for the current {@link StreamOperator}.
 	 *
-	 * @param keyGroupAssigner Key group assigner to be used
+	 * @param numberOfKeyGroups Number of key-groups to be used
 	 */
-	public void setKeyGroupAssigner(KeyGroupAssigner<?> keyGroupAssigner) {
+	public void setNumberOfKeyGroups(int numberOfKeyGroups) {
 		try {
-			InstantiationUtil.writeObjectToConfig(keyGroupAssigner, this.config, KEY_GROUP_ASSIGNER);
+			InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, NUMBER_OF_KEY_GROUPS);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not serialize virtual state partitioner.", e);
 		}
 	}
 
 	/**
-	 * Gets the {@link KeyGroupAssigner} for the {@link StreamOperator}.
+	 * Gets the number of key-groups for the {@link StreamOperator}.
 	 *
-	 * @param classLoader Classloader to be used for the deserialization
-	 * @param <K> Type of the keys to be assigned to key groups
-	 * @return Key group assigner
+	 * @return the number of key-groups
 	 */
-	public <K> KeyGroupAssigner<K> getKeyGroupAssigner(ClassLoader classLoader) {
+	public Integer getNumberOfKeyGroups(ClassLoader cl) {
 		try {
-			return InstantiationUtil.readObjectFromConfig(this.config, KEY_GROUP_ASSIGNER, classLoader);
+			return InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate virtual state partitioner.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f9f26e9..506b664 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,24 +147,24 @@ public class StreamGraphGenerator {
 		LOG.debug("Transforming " + transform);
 
 		if (transform.getMaxParallelism() <= 0) {
+
 			// if the max parallelism hasn't been set, then first use the job wide max parallelism
 			// from theExecutionConfig. If this value has not been specified either, then use the
 			// parallelism of the operator.
 			int maxParallelism = env.getConfig().getMaxParallelism();
 
 			if (maxParallelism <= 0) {
-				maxParallelism = transform.getParallelism();
-				/**
-				 * TODO: Remove once the parallelism settings works properly in Flink (FLINK-3885)
-				 * Currently, the parallelism will be set to 1 on the JobManager iff it encounters
-				 * a negative parallelism value. We need to know this for the
-				 * KeyGroupStreamPartitioner on the client-side. Thus, we already set the value to
-				 * 1 here.
-				 */
-				if (maxParallelism <= 0) {
-					transform.setParallelism(1);
-					maxParallelism = 1;
+
+				int parallelism = transform.getParallelism();
+
+				if(parallelism <= 0) {
+					parallelism = 1;
+					transform.setParallelism(parallelism);
 				}
+
+				maxParallelism = Math.max(
+						MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)),
+						KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM);
 			}
 
 			transform.setMaxParallelism(maxParallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 76fdaca..a024895 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -357,12 +355,9 @@ public class StreamingJobGraphGenerator {
 		config.setStatePartitioner(1, vertex.getStatePartitioner2());
 		config.setStateKeySerializer(vertex.getStateKeySerializer());
 
-		// only set the key group assigner if the vertex uses partitioned state (= KeyedStream).
+		// only set the max parallelism if the vertex uses partitioned state (= KeyedStream).
 		if (vertex.getStatePartitioner1() != null) {
-			// the key group assigner has to know the number of key groups (= maxParallelism)
-			KeyGroupAssigner<Object> keyGroupAssigner = new HashKeyGroupAssigner<Object>(vertex.getMaxParallelism());
-
-			config.setKeyGroupAssigner(keyGroupAssigner);
+			config.setNumberOfKeyGroups(vertex.getMaxParallelism());
 		}
 
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 718c0c7..71296e3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -133,14 +134,14 @@ public abstract class AbstractStreamOperator<OUT>
 			if (null != keySerializer) {
 				ExecutionConfig execConf = container.getEnvironment().getExecutionConfig();;
 
-				KeyGroupRange subTaskKeyGroupRange = KeyGroupRange.computeKeyGroupRangeForOperatorIndex(
+				KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
 						container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
 						container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
 						container.getIndexInSubtaskGroup());
 
 				keyedStateBackend = container.createKeyedStateBackend(
 						keySerializer,
-						container.getConfiguration().getKeyGroupAssigner(getUserCodeClassloader()),
+						container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
 						subTaskKeyGroupRange);
 			}
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 108a3ae..256fee1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -18,16 +18,14 @@
 package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 /**
- * Partitioner selects the target channel based on the key group index. The key group
- * index is derived from the key of the elements using the {@link KeyGroupAssigner}.
+ * Partitioner selects the target channel based on the key group index.
  *
  * @param <T> Type of the elements in the Stream being partitioned
  */
@@ -39,15 +37,16 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
 
 	private final KeySelector<T, K> keySelector;
 
-	private final KeyGroupAssigner<K> keyGroupAssigner;
+	private int maxParallelism;
 
-	public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, KeyGroupAssigner<K> keyGroupAssigner) {
+	public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
 		this.keySelector = Preconditions.checkNotNull(keySelector);
-		this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+		this.maxParallelism = maxParallelism;
 	}
 
-	public KeyGroupAssigner<K> getKeyGroupAssigner() {
-		return keyGroupAssigner;
+	public int getMaxParallelism() {
+		return maxParallelism;
 	}
 
 	@Override
@@ -61,10 +60,7 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
 		} catch (Exception e) {
 			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
 		}
-		returnArray[0] = KeyGroupRange.computeOperatorIndexForKeyGroup(
-				keyGroupAssigner.getNumberKeyGroups(),
-				numberOfOutputChannels,
-				keyGroupAssigner.getKeyGroupIndex(key));
+		returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
 		return returnArray;
 	}
 
@@ -80,6 +76,6 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
 
 	@Override
 	public void configure(int maxParallelism) {
-		keyGroupAssigner.setup(maxParallelism);
+		this.maxParallelism = maxParallelism;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 13f650c..701281b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -762,7 +761,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	public <K> KeyedStateBackend<K> createKeyedStateBackend(
 			TypeSerializer<K> keySerializer,
-			KeyGroupAssigner<K> keyGroupAssigner,
+			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange) throws Exception {
 
 		if (keyedStateBackend != null) {
@@ -779,7 +778,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					getEnvironment().getJobID(),
 					operatorIdentifier,
 					keySerializer,
-					keyGroupAssigner,
+					numberOfKeyGroups,
 					keyGroupRange,
 					lazyRestoreKeyGroupStates,
 					getEnvironment().getTaskKvStateRegistry());
@@ -791,7 +790,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					getEnvironment().getJobID(),
 					operatorIdentifier,
 					keySerializer,
-					keyGroupAssigner,
+					numberOfKeyGroups,
 					keyGroupRange,
 					getEnvironment().getTaskKvStateRegistry());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 06d381f..874274f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -19,11 +19,9 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -270,10 +268,6 @@ public class StreamGraphGeneratorTest {
 		StreamNode keyedResultNode = graph.getStreamNode(keyedResult.getId());
 
 		StreamPartitioner<?> streamPartitioner = keyedResultNode.getInEdges().get(0).getPartitioner();
-
-		HashKeyGroupAssigner<?> hashKeyGroupAssigner = extractHashKeyGroupAssigner(streamPartitioner);
-
-		assertEquals(maxParallelism, hashKeyGroupAssigner.getNumberKeyGroups());
 	}
 
 	/**
@@ -384,7 +378,7 @@ public class StreamGraphGeneratorTest {
 	}
 
 	/**
-	 * Tests that the max parallelism and the key group partitioner is properly set for connected
+	 * Tests that the max parallelism is properly set for connected
 	 * streams.
 	 */
 	@Test
@@ -423,24 +417,6 @@ public class StreamGraphGeneratorTest {
 
 		StreamPartitioner<?> streamPartitioner1 = keyedResultNode.getInEdges().get(0).getPartitioner();
 		StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner();
-
-		HashKeyGroupAssigner<?> hashKeyGroupAssigner1 = extractHashKeyGroupAssigner(streamPartitioner1);
-		assertEquals(maxParallelism, hashKeyGroupAssigner1.getNumberKeyGroups());
-
-		HashKeyGroupAssigner<?> hashKeyGroupAssigner2 = extractHashKeyGroupAssigner(streamPartitioner2);
-		assertEquals(maxParallelism, hashKeyGroupAssigner2.getNumberKeyGroups());
-	}
-
-	private HashKeyGroupAssigner<?> extractHashKeyGroupAssigner(StreamPartitioner<?> streamPartitioner) {
-		assertTrue(streamPartitioner instanceof KeyGroupStreamPartitioner);
-
-		KeyGroupStreamPartitioner<?, ?> keyGroupStreamPartitioner = (KeyGroupStreamPartitioner<?, ?>) streamPartitioner;
-
-		KeyGroupAssigner<?> keyGroupAssigner = keyGroupStreamPartitioner.getKeyGroupAssigner();
-
-		assertTrue(keyGroupAssigner instanceof HashKeyGroupAssigner);
-
-		return (HashKeyGroupAssigner<?>) keyGroupAssigner;
 	}
 
 	private static class OutputTypeConfigurableOperationWithTwoInputs