You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:21 UTC

[03/27] flink git commit: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter

[FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter

This introduces a new KeySelector that assigns keys to key groups and
also adds the max parallelism parameter throughout all API levels.

This also adds tests for the newly introduced features.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec975aab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec975aab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec975aab

Branch: refs/heads/master
Commit: ec975aaba79449bd93020f296b05ea509ea57bdc
Parents: 7cd9bb5
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 28 15:08:24 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:04:31 2016 +0200

----------------------------------------------------------------------
 docs/dev/api_concepts.md                        |   2 +
 .../flink-statebackend-rocksdb/pom.xml          |   7 +
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../src/test/resources/log4j.properties         |   2 +-
 .../flink/api/common/ExecutionConfig.java       |  51 +-
 .../api/common/state/KeyGroupAssigner.java      |  47 ++
 .../flink/api/common/state/StateDescriptor.java |   1 +
 .../checkpoint/CheckpointCoordinator.java       |   6 +-
 .../InputChannelDeploymentDescriptor.java       |   1 +
 .../InputGateDeploymentDescriptor.java          |   1 +
 .../deployment/ResultPartitionLocation.java     |   1 +
 .../runtime/executiongraph/ExecutionGraph.java  |   2 -
 .../executiongraph/ExecutionJobVertex.java      |  35 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +
 .../flink/runtime/jobgraph/JobVertex.java       |  30 +-
 .../runtime/state/HashKeyGroupAssigner.java     |  66 ++
 .../flink/runtime/jobmanager/JobManager.scala   |   1 -
 .../checkpoint/CheckpointCoordinatorTest.java   |  17 -
 .../checkpoint/CheckpointStateRestoreTest.java  |   5 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   1 -
 .../scheduler/SchedulerTestUtils.java           |   2 +
 .../testutils/DummyEnvironment.java.orig        | 185 -----
 .../streaming/api/datastream/KeyedStream.java   |  12 +-
 .../datastream/SingleOutputStreamOperator.java  |  19 +
 .../environment/StreamExecutionEnvironment.java |  42 +-
 .../flink/streaming/api/graph/StreamConfig.java |  49 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  33 +-
 .../api/graph/StreamGraphGenerator.java         |  43 +-
 .../flink/streaming/api/graph/StreamNode.java   |  25 +
 .../api/graph/StreamingJobGraphGenerator.java   |  31 +-
 .../transformations/StreamTransformation.java   |  24 +
 .../ConfigurableStreamPartitioner.java          |  39 ++
 .../runtime/partitioner/HashPartitioner.java    |  66 --
 .../partitioner/KeyGroupStreamPartitioner.java  |  82 +++
 .../streaming/api/AggregationFunctionTest.java  |   8 +-
 .../flink/streaming/api/DataStreamTest.java     |   8 +-
 .../streaming/api/RestartStrategyTest.java      |   4 +-
 .../streaming/api/graph/SlotAllocationTest.java |   3 +-
 .../api/graph/StreamGraphGeneratorTest.java     | 220 ++++++
 .../graph/StreamingJobGraphGeneratorTest.java   | 216 +++++-
 .../StreamingJobGraphGeneratorNodeHashTest.java |  64 +-
 .../windowing/AllWindowTranslationTest.java     |   4 +
 .../windowing/WindowTranslationTest.java        |  10 +
 .../partitioner/HashPartitionerTest.java        |  71 --
 .../KeyGroupStreamPartitionerTest.java          |  74 ++
 .../partitioner/RescalePartitionerTest.java     |   4 +
 .../tasks/OneInputStreamTaskTestHarness.java    |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java | 176 +++--
 .../runtime/tasks/StreamTaskTestHarness.java    |  37 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../flink/streaming/api/scala/DataStream.scala  |  11 +
 .../api/scala/StreamExecutionEnvironment.scala  |  18 +
 .../streaming/api/scala/DataStreamTest.scala    |   2 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   2 +-
 .../test/checkpointing/RescalingITCase.java     | 683 +++++++++++++++++++
 .../streaming/api/StreamingOperatorsITCase.java |  17 +
 .../streaming/runtime/DataStreamPojoITCase.java |   2 +
 .../test/streaming/runtime/TimestampITCase.java |   1 +
 .../flink/test/web/WebFrontendITCase.java       |   3 +-
 59 files changed, 2074 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/docs/dev/api_concepts.md
----------------------------------------------------------------------
diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index 468085a..b0c034c 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -960,6 +960,8 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
 
 - `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job.
 
+- `getMaxParallelism()` / `setMaxParallelism(int parallelism)` Set the default maximum parallelism for the job. This setting determines the maximum degree of parallelism and specifies the upper limit for dynamic scaling.
+
 - `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.
 
 - `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index efd673c..314b83f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -69,6 +69,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_2.10</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
index 0b686e5..881dc06 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
index ed2bbcb..4a30c6f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
@@ -24,4 +24,4 @@ log4j.rootLogger=OFF, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target = System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 5b69794..81ee930 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -74,6 +74,13 @@ public class ExecutionConfig implements Serializable {
 	 */
 	public static final int PARALLELISM_DEFAULT = -1;
 
+	/**
+	 * The flag value indicating an unknown or unset parallelism. This value is
+	 * not a valid parallelism and indicates that the parallelism should remain
+	 * unchanged.
+	 */
+	public static final int PARALLELISM_UNKNOWN = -2;
+
 	private static final long DEFAULT_RESTART_DELAY = 10000L;
 
 	// --------------------------------------------------------------------------------------------
@@ -86,6 +93,13 @@ public class ExecutionConfig implements Serializable {
 	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
+	 * The program wide maximum parallelism used for operators which haven't specified a maximum
+	 * parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
+	 * number of key groups used for partitioned state.
+	 */
+	private int maxParallelism = -1;
+
+	/**
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
 	 */
 	@Deprecated
@@ -219,12 +233,41 @@ public class ExecutionConfig implements Serializable {
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism of an operator must be at least 1.");
+		if (parallelism != PARALLELISM_UNKNOWN) {
+			if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
+				throw new IllegalArgumentException(
+					"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
+			}
+			this.parallelism = parallelism;
+		}
+		return this;
+	}
 
-		this.parallelism = parallelism;
+	/**
+	 * Gets the maximum degree of parallelism defined for the program.
+	 *
+	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+	 * defines the number of key groups used for partitioned state.
+	 *
+	 * @return Maximum degree of parallelism
+	 */
+	@PublicEvolving
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
 
-		return this;
+	/**
+	 * Sets the maximum degree of parallelism defined for the program.
+	 *
+	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+	 * defines the number of key groups used for partitioned state.
+	 *
+	 * @param maxParallelism Maximum degree of parallelism to be used for the program.
+	 */
+	@PublicEvolving
+	public void setMaxParallelism(int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+		this.maxParallelism = maxParallelism;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
new file mode 100644
index 0000000..23463e1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/**
+ * Assigns a key to a key group index. A key group is the smallest unit of partitioned state
+ * which is assigned to an operator. An operator can be assigned multiple key groups.
+ *
+ * @param <K> Type of the key
+ */
+@Internal
+public interface KeyGroupAssigner<K> extends Serializable {
+	/**
+	 * Calculates the key group index for the given key.
+	 *
+	 * @param key Key to be used
+	 * @return Key group index for the given key
+	 */
+	int getKeyGroupIndex(K key);
+
+	/**
+	 * Setups the key group assigner with the maximum parallelism (= number of key groups).
+	 *
+	 * @param maxParallelism Maximum parallelism (= number of key groups)
+	 */
+	void setup(int maxParallelism);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index d99f4de..483c954 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -44,6 +44,7 @@ import static java.util.Objects.requireNonNull;
  * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
  *
  * @param <S> The type of the State objects created from this {@code StateDescriptor}.
+ * @param <T> The type of the value of the state object described by this state descriptor.
  */
 @PublicEvolving
 public abstract class StateDescriptor<S extends State, T> implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3619f48..24fca5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -155,8 +155,6 @@ public class CheckpointCoordinator {
 	/** Helper for tracking checkpoint statistics  */
 	private final CheckpointStatsTracker statsTracker;
 
-	private final int numberKeyGroups;
-
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -165,7 +163,6 @@ public class CheckpointCoordinator {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpointAttempts,
-			int numberKeyGroups,
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
 			ExecutionVertex[] tasksToCommitTo,
@@ -202,7 +199,6 @@ public class CheckpointCoordinator {
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 		this.userClassLoader = checkNotNull(userClassLoader);
 		this.statsTracker = checkNotNull(statsTracker);
-		this.numberKeyGroups = numberKeyGroups;
 
 		this.timer = new Timer("Checkpoint Timer", true);
 
@@ -797,7 +793,7 @@ public class CheckpointCoordinator {
 
 					int counter = 0;
 
-					List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions(numberKeyGroups, executionJobVertex.getParallelism());
+					List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions(executionJobVertex.getMaxParallelism(), executionJobVertex.getParallelism());
 
 					for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
 						SubtaskState subtaskState = taskState.getState(i);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index e00a480..f31febb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -48,6 +48,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class InputChannelDeploymentDescriptor implements Serializable {
 
+	private static final long serialVersionUID = 373711381640454080L;
 	private static Logger LOG = LoggerFactory.getLogger(InputChannelDeploymentDescriptor.class);
 
 	/** The ID of the partition the input channel is going to consume. */

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index ec4bd40..dde1ed7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -38,6 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class InputGateDeploymentDescriptor implements Serializable {
 
+	private static final long serialVersionUID = -7143441863165366704L;
 	/**
 	 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 	 * intermediate result specified by this ID. This ID also identifies the input gate at the

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
index ca63e6b..895bea0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
@@ -47,6 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ResultPartitionLocation implements Serializable {
 
+	private static final long serialVersionUID = -6354238166937194463L;
 	/** The type of location for the result partition. */
 	private final LocationType locationType;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d7e40a3..92cab41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -340,7 +340,6 @@ public class ExecutionGraph {
 			long checkpointTimeout,
 			long minPauseBetweenCheckpoints,
 			int maxConcurrentCheckpoints,
-			int numberKeyGroups,
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
@@ -373,7 +372,6 @@ public class ExecutionGraph {
 				checkpointTimeout,
 				minPauseBetweenCheckpoints,
 				maxConcurrentCheckpoints,
-				numberKeyGroups,
 				tasksToTrigger,
 				tasksToWaitFor,
 				tasksToCommitTo,

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6272151..d3dc8fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.util.SerializableObject;
-
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -69,6 +69,8 @@ public class ExecutionJobVertex {
 	private final List<IntermediateResult> inputs;
 	
 	private final int parallelism;
+
+	private final int maxParallelism;
 	
 	private final boolean[] finishedSubtasks;
 			
@@ -81,16 +83,23 @@ public class ExecutionJobVertex {
 	private final InputSplit[] inputSplits;
 
 	private InputSplitAssigner splitAssigner;
+	
+	public ExecutionJobVertex(
+		ExecutionGraph graph,
+		JobVertex jobVertex,
+		int defaultParallelism,
+		FiniteDuration timeout) throws JobException {
 
-	public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
-							int defaultParallelism, FiniteDuration timeout) throws JobException {
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
 	}
 	
-	public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
-							int defaultParallelism, FiniteDuration timeout, long createTimestamp)
-			throws JobException
-	{
+	public ExecutionJobVertex(
+		ExecutionGraph graph,
+		JobVertex jobVertex,
+		int defaultParallelism,
+		FiniteDuration timeout,
+		long createTimestamp) throws JobException {
+
 		if (graph == null || jobVertex == null) {
 			throw new NullPointerException();
 		}
@@ -102,6 +111,14 @@ public class ExecutionJobVertex {
 		int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
 		
 		this.parallelism = numTaskVertices;
+
+		int maxParallelism = jobVertex.getMaxParallelism();
+
+		Preconditions.checkArgument(maxParallelism >= parallelism, "The maximum parallelism (" +
+			maxParallelism + ") must be greater or equal than the parallelism (" + parallelism +
+			").");
+		this.maxParallelism = maxParallelism;
+
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
 		
 		this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
@@ -177,6 +194,10 @@ public class ExecutionJobVertex {
 		return parallelism;
 	}
 
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
 	public JobID getJobId() {
 		return graph.getJobID();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 2495316..b15f851 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -187,6 +187,10 @@ public class ExecutionVertex {
 		return this.subTaskIndex;
 	}
 
+	public int getMaxParallelism() {
+		return this.jobVertex.getMaxParallelism();
+	}
+
 	public int getNumberOfInputs() {
 		return this.inputEdges.length;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 379a42a..4786388 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
@@ -31,6 +28,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * The base class for job vertexes.
  */
@@ -57,6 +57,9 @@ public class JobVertex implements java.io.Serializable {
 	/** Number of subtasks to split this task into at runtime.*/
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
+	/** Maximum number of subtasks to split this taks into a runtime. */
+	private int maxParallelism = Short.MAX_VALUE;
+
 	/** Custom configuration passed to the assigned task at runtime. */
 	private Configuration configuration;
 
@@ -234,6 +237,27 @@ public class JobVertex implements java.io.Serializable {
 		this.parallelism = parallelism;
 	}
 
+	/**
+	 * Gets the maximum parallelism for the task.
+	 *
+	 * @return The maximum parallelism for the task.
+	 */
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	/**
+	 * Sets the maximum parallelism for the task.
+	 *
+	 * @param maxParallelism The maximum parallelism to be set.
+	 */
+	public void setMaxParallelism(int maxParallelism) {
+		org.apache.flink.util.Preconditions.checkArgument(
+				maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");
+
+		this.maxParallelism = maxParallelism;
+	}
+
 	public InputSplitSource<?> getInputSplitSource() {
 		return inputSplitSource;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
new file mode 100644
index 0000000..280746d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Hash based key group assigner. The assigner assigns each key to a key group using the hash value
+ * of the key.
+ *
+ * @param <K> Type of the key
+ */
+public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
+	private static final long serialVersionUID = -6319826921798945448L;
+
+	private static final int UNDEFINED_NUMBER_KEY_GROUPS = Integer.MIN_VALUE;
+
+	private int numberKeyGroups;
+
+	public HashKeyGroupAssigner() {
+		this(UNDEFINED_NUMBER_KEY_GROUPS);
+	}
+
+	public HashKeyGroupAssigner(int numberKeyGroups) {
+		Preconditions.checkArgument(numberKeyGroups > 0 || numberKeyGroups == UNDEFINED_NUMBER_KEY_GROUPS,
+			"The number of key groups has to be greater than 0 or undefined. Use " +
+			"setMaxParallelism() to specify the number of key groups.");
+		this.numberKeyGroups = numberKeyGroups;
+	}
+
+	public int getNumberKeyGroups() {
+		return numberKeyGroups;
+	}
+
+	@Override
+	public int getKeyGroupIndex(K key) {
+		return MathUtils.murmurHash(key.hashCode()) % numberKeyGroups;
+	}
+
+	@Override
+	public void setup(int numberKeyGroups) {
+		Preconditions.checkArgument(numberKeyGroups > 0, "The number of key groups has to be " +
+			"greater than 0. Use setMaxParallelism() to specify the number of key " +
+			"groups.");
+
+		this.numberKeyGroups = numberKeyGroups;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b706a1a..356f1a9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1271,7 +1271,6 @@ class JobManager(
             snapshotSettings.getCheckpointTimeout,
             snapshotSettings.getMinPauseBetweenCheckpoints,
             snapshotSettings.getMaxConcurrentCheckpoints,
-            parallelism,
             triggerVertices,
             ackVertices,
             confirmVertices,

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 09c53d6..50330fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -86,7 +86,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					600000,
 					0, Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
@@ -140,7 +139,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
@@ -192,7 +190,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {},
@@ -245,7 +242,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
@@ -371,7 +367,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
@@ -493,7 +488,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
@@ -645,7 +639,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex },
@@ -782,7 +775,6 @@ public class CheckpointCoordinatorTest {
 					600000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex },
@@ -905,7 +897,6 @@ public class CheckpointCoordinatorTest {
 					200,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex },
@@ -975,7 +966,6 @@ public class CheckpointCoordinatorTest {
 					200000,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex },
@@ -1056,7 +1046,6 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex },
@@ -1149,7 +1138,6 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					500,    // 500ms delay between checkpoints
 					10,
-					42,
 					new ExecutionVertex[] { vertex1 },
 					new ExecutionVertex[] { vertex1 },
 					new ExecutionVertex[] { vertex1 },
@@ -1235,7 +1223,6 @@ public class CheckpointCoordinatorTest {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				42,
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
@@ -1374,7 +1361,6 @@ public class CheckpointCoordinatorTest {
 				600000,
 				0,
 				Integer.MAX_VALUE,
-				42,
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
 				new ExecutionVertex[] { vertex1, vertex2 },
@@ -1462,7 +1448,6 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0L,        // no extra delay
 					maxConcurrentAttempts,
-					42,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
@@ -1534,7 +1519,6 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0L,        // no extra delay
 					maxConcurrentAttempts, // max two concurrent checkpoints
-					42,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
@@ -1615,7 +1599,6 @@ public class CheckpointCoordinatorTest {
 					200000,    // timeout is very long (200 s)
 					0L,        // no extra delay
 					2, // max two concurrent checkpoints
-					42,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex },
 					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 061059a..1816fc9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -90,7 +90,6 @@ public class CheckpointStateRestoreTest {
 					200000L,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0],
@@ -170,7 +169,6 @@ public class CheckpointStateRestoreTest {
 					200000L,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0],
@@ -221,7 +219,6 @@ public class CheckpointStateRestoreTest {
 					200000L,
 					0,
 					Integer.MAX_VALUE,
-					42,
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[0], cl,
@@ -263,12 +260,14 @@ public class CheckpointStateRestoreTest {
 		when(mock.getParallelSubtaskIndex()).thenReturn(subtask);
 		when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
 		when(mock.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
+		when(mock.getMaxParallelism()).thenReturn(parallelism);
 		return mock;
 	}
 
 	private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(vertices.length);
+		when(vertex.getMaxParallelism()).thenReturn(vertices.length);
 		when(vertex.getJobVertexId()).thenReturn(id);
 		when(vertex.getTaskVertices()).thenReturn(vertices);
 		return vertex;

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 49a9449..1de7098 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -112,7 +112,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				100,
 				100,
 				1,
-				42,
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index ed6d1ee..983d6e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -104,6 +104,7 @@ public class SchedulerTestUtils {
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
+		when(vertex.getMaxParallelism()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
 		
@@ -121,6 +122,7 @@ public class SchedulerTestUtils {
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
+		when(vertex.getMaxParallelism()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
 		Execution execution = mock(Execution.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
deleted file mode 100644
index 393ee4c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-public class DummyEnvironment implements Environment {
-
-	private final JobID jobId = new JobID();
-	private final JobVertexID jobVertexId = new JobVertexID();
-	private final ExecutionAttemptID executionId = new ExecutionAttemptID();
-	private final ExecutionConfig executionConfig = new ExecutionConfig();
-<<<<<<< 9a73dbc71b83080b7deccc62b8b6ffa9f102e847
-	private final TaskInfo taskInfo;
-=======
-	private final KvStateRegistry kvStateRegistry = new KvStateRegistry();
-	private final TaskKvStateRegistry taskKvStateRegistry;
->>>>>>> [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
-
-	public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
-		this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0);
-
-		this.taskKvStateRegistry = kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
-	}
-
-	public KvStateRegistry getKvStateRegistry() {
-		return kvStateRegistry;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return executionConfig;
-	}
-
-	@Override
-	public JobID getJobID() {
-		return jobId;
-	}
-
-	@Override
-	public JobVertexID getJobVertexId() {
-		return jobVertexId;
-	}
-
-	@Override
-	public ExecutionAttemptID getExecutionId() {
-		return executionId;
-	}
-
-	@Override
-	public Configuration getTaskConfiguration() {
-		return new Configuration();
-	}
-
-	@Override
-	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return null;
-	}
-
-	@Override
-	public TaskMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup();
-	}
-
-	@Override
-	public Configuration getJobConfiguration() {
-		return new Configuration();
-	}
-
-	@Override
-	public TaskInfo getTaskInfo() {
-		return taskInfo;
-	}
-
-	@Override
-	public InputSplitProvider getInputSplitProvider() {
-		return null;
-	}
-
-	@Override
-	public IOManager getIOManager() {
-		return null;
-	}
-
-	@Override
-	public MemoryManager getMemoryManager() {
-		return null;
-	}
-
-	@Override
-	public ClassLoader getUserClassLoader() {
-		return getClass().getClassLoader();
-	}
-
-	@Override
-	public Map<String, Future<Path>> getDistributedCacheEntries() {
-		return Collections.emptyMap();
-	}
-
-	@Override
-	public BroadcastVariableManager getBroadcastVariableManager() {
-		return null;
-	}
-
-	@Override
-	public AccumulatorRegistry getAccumulatorRegistry() {
-		return null;
-	}
-
-	@Override
-<<<<<<< 9a73dbc71b83080b7deccc62b8b6ffa9f102e847
-	public void acknowledgeCheckpoint(long checkpointId) {}
-=======
-	public TaskKvStateRegistry getTaskKvStateRegistry() {
-		return taskKvStateRegistry;
-	}
-
-	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-	}
->>>>>>> [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
-
-	@Override
-	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
-
-	@Override
-	public ResultPartitionWriter getWriter(int index) {
-		return null;
-	}
-
-	@Override
-	public ResultPartitionWriter[] getAllWriters() {
-		return null;
-	}
-
-	@Override
-	public InputGate getInputGate(int index) {
-		return null;
-	}
-
-	@Override
-	public InputGate[] getAllInputGates() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 6998890..1fb34b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -55,7 +56,7 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 import java.util.UUID;
@@ -105,8 +106,13 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *            Function for determining state partitions
 	 */
 	public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
-		super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
-				dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+		super(
+			dataStream.getExecutionEnvironment(),
+			new PartitionTransformation<>(
+				dataStream.getTransformation(),
+				new KeyGroupStreamPartitioner<>(
+					keySelector,
+					new HashKeyGroupAssigner<KEY>())));
 		this.keySelector = keySelector;
 		this.keyType = keyType;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 02ea219..614f19b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
 
@@ -106,6 +107,24 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	}
 
 	/**
+	 * Sets the maximum parallelism of this operator.
+	 *
+	 * The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the
+	 * number of key groups used for partitioned state.
+	 *
+	 * @param maxParallelism Maximum parallelism
+	 * @return The operator with set maximum parallelism
+	 */
+	@PublicEvolving
+	public SingleOutputStreamOperator<T> setMaxParallelism(int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+
+		transformation.setMaxParallelism(maxParallelism);
+
+		return this;
+	}
+
+	/**
 	 * Sets the parallelism of this operator to one.
 	 * And mark this operator cannot set a non-1 degree of parallelism.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ead9564..78aab97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -18,16 +18,16 @@
 package org.apache.flink.streaming.api.environment;
 
 import com.esotericsoftware.kryo.Serializer;
-
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
@@ -46,22 +46,22 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.api.common.io.FilePathFilter;
-import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
@@ -69,7 +69,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SplittableIterator;
@@ -168,6 +167,21 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the maximum degree of parallelism defined for the program.
+	 *
+	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+	 * defines the number of key groups used for partitioned state.
+	 *
+	 * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15
+	 */
+	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= (1 << 15),
+				"maxParallelism is out of bounds 0 < maxParallelism <= 2^15. Found: " + maxParallelism);
+		config.setMaxParallelism(maxParallelism);
+		return this;
+	}
+
+	/**
 	 * Gets the parallelism with which operation are executed by default.
 	 * Operations can individually override this value to use a specific
 	 * parallelism.
@@ -180,6 +194,18 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Gets the maximum degree of parallelism defined for the program.
+	 *
+	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+	 * defines the number of key groups used for partitioned state.
+	 *
+	 * @return Maximum degree of parallelism
+	 */
+	public int getMaxParallelism() {
+		return config.getMaxParallelism();
+	}
+
+	/**
 	 * Sets the maximum time frequency (milliseconds) for the flushing of the
 	 * output buffers. By default the output buffers flush frequently to provide
 	 * low latency and to aid smooth developer experience. Setting the parameter

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 783b3e2..1a92ba4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
@@ -74,6 +75,10 @@ public class StreamConfig implements Serializable {
 	
 	private static final String STATE_BACKEND = "statebackend";
 	private static final String STATE_PARTITIONER = "statePartitioner";
+
+	/** key for the {@link KeyGroupAssigner} for key to key group index mappings */
+	private static final String KEY_GROUP_ASSIGNER = "keyGroupAssigner";
+
 	private static final String STATE_KEY_SERIALIZER = "statekeyser";
 	
 	private static final String TIME_CHARACTERISTIC = "timechar";
@@ -402,10 +407,12 @@ public class StreamConfig implements Serializable {
 	// ------------------------------------------------------------------------
 	
 	public void setStateBackend(AbstractStateBackend backend) {
-		try {
-			InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+		if (backend != null) {
+			try {
+				InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
+			} catch (Exception e) {
+				throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+			}
 		}
 	}
 	
@@ -416,6 +423,10 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate statehandle provider.", e);
 		}
 	}
+
+	public byte[] getSerializedStateBackend() {
+		return this.config.getBytes(STATE_BACKEND, null);
+	}
 	
 	public void setStatePartitioner(int input, KeySelector<?, ?> partitioner) {
 		try {
@@ -432,6 +443,34 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate state partitioner.", e);
 		}
 	}
+
+	/**
+	 * Sets the {@link KeyGroupAssigner} to be used for the current {@link StreamOperator}.
+	 *
+	 * @param keyGroupAssigner Key group assigner to be used
+	 */
+	public void setKeyGroupAssigner(KeyGroupAssigner<?> keyGroupAssigner) {
+		try {
+			InstantiationUtil.writeObjectToConfig(keyGroupAssigner, this.config, KEY_GROUP_ASSIGNER);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not serialize virtual state partitioner.", e);
+		}
+	}
+
+	/**
+	 * Gets the {@link KeyGroupAssigner} for the {@link StreamOperator}.
+	 *
+	 * @param classLoader Classloader to be used for the deserialization
+	 * @param <K> Type of the keys to be assigned to key groups
+	 * @return Key group assigner
+	 */
+	public <K> KeyGroupAssigner<K> getKeyGroupAssigner(ClassLoader classLoader) {
+		try {
+			return InstantiationUtil.readObjectFromConfig(this.config, KEY_GROUP_ASSIGNER, classLoader);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate virtual state partitioner.", e);
+		}
+	}
 	
 	public void setStateKeySerializer(TypeSerializer<?> serializer) {
 		try {
@@ -448,6 +487,8 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
 		}
 	}
+
+
 	
 	// ------------------------------------------------------------------------
 	//  Miscellansous

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4be2874..c946e98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -357,6 +358,18 @@ public class StreamGraph extends StreamingPlan {
 			if (partitioner == null) {
 				partitioner = virtuaPartitionNodes.get(virtualId).f1;
 			}
+
+			if (partitioner instanceof ConfigurableStreamPartitioner) {
+				StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+				ConfigurableStreamPartitioner configurableStreamPartitioner = (ConfigurableStreamPartitioner) partitioner;
+
+				// Configure the partitioner with the max parallelism. This is necessary if the
+				// partitioner has been created before the maximum parallelism has been set. The
+				// maximum parallelism is necessary for the key group mapping.
+				configurableStreamPartitioner.configure(downstreamNode.getMaxParallelism());
+			}
+
 			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
 		} else {
 			StreamNode upstreamNode = getStreamNode(upStreamVertexID);
@@ -407,6 +420,12 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
+	public void setMaxParallelism(int vertexID, int maxParallelism) {
+		if (getStreamNode(vertexID) != null) {
+			getStreamNode(vertexID).setMaxParallelism(maxParallelism);
+		}
+	}
+
 	public void setOneInputStateKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
 		StreamNode node = getStreamNode(vertexID);
 		node.setStatePartitioner1(keySelector);
@@ -514,7 +533,13 @@ public class StreamGraph extends StreamingPlan {
 		return vertexIDtoLoopTimeout.get(vertexID);
 	}
 
-	public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
+	public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(
+		int loopId,
+		int sourceId,
+		int sinkId,
+		long timeout,
+		int parallelism,
+		int maxParallelism) {
 		StreamNode source = this.addNode(sourceId,
 			null,
 			StreamIterationHead.class,
@@ -522,6 +547,7 @@ public class StreamGraph extends StreamingPlan {
 			"IterationSource-" + loopId);
 		sources.add(source.getId());
 		setParallelism(source.getId(), parallelism);
+		setMaxParallelism(source.getId(), maxParallelism);
 
 		StreamNode sink = this.addNode(sinkId,
 			null,
@@ -530,6 +556,7 @@ public class StreamGraph extends StreamingPlan {
 			"IterationSink-" + loopId);
 		sinks.add(sink.getId());
 		setParallelism(sink.getId(), parallelism);
+		setMaxParallelism(sink.getId(), parallelism);
 
 		iterationSourceSinkPairs.add(new Tuple2<>(source, sink));
 
@@ -604,8 +631,4 @@ public class StreamGraph extends StreamingPlan {
 			}
 		}
 	}
-
-	public static enum ResourceStrategy {
-		DEFAULT, ISOLATE, NEWGROUP
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index de80e25..81c5c48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -144,6 +144,31 @@ public class StreamGraphGenerator {
 
 		LOG.debug("Transforming " + transform);
 
+		if (transform.getMaxParallelism() <= 0) {
+			// if the max parallelism hasn't been set, then first use the job wide max parallelism
+			// from theExecutionConfig. If this value has not been specified either, then use the
+			// parallelism of the operator.
+			int maxParallelism = env.getConfig().getMaxParallelism();
+
+			if (maxParallelism <= 0) {
+				maxParallelism = transform.getParallelism();
+
+				/**
+				 * TODO: Remove once the parallelism settings works properly in Flink (FLINK-3885)
+				 * Currently, the parallelism will be set to 1 on the JobManager iff it encounters
+				 * a negative parallelism value. We need to know this for the
+				 * KeyGroupStreamPartitioner on the client-side. Thus, we already set the value to
+				 * 1 here.
+				 */
+				if (maxParallelism <= 0) {
+					transform.setParallelism(1);
+					maxParallelism = 1;
+				}
+			}
+
+			transform.setMaxParallelism(maxParallelism);
+		}
+
 		// call at least once to trigger exceptions about MissingTypeInfo
 		transform.getOutputType();
 
@@ -309,11 +334,12 @@ public class StreamGraphGenerator {
 
 		// create the fake iteration source/sink pair
 		Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
-				iterate.getId(),
-				getNewIterationNodeId(),
-				getNewIterationNodeId(),
-				iterate.getWaitTime(),
-				iterate.getParallelism());
+			iterate.getId(),
+			getNewIterationNodeId(),
+			getNewIterationNodeId(),
+			iterate.getWaitTime(),
+			iterate.getParallelism(),
+			iterate.getMaxParallelism());
 
 		StreamNode itSource = itSourceAndSink.f0;
 		StreamNode itSink = itSourceAndSink.f1;
@@ -377,7 +403,8 @@ public class StreamGraphGenerator {
 				getNewIterationNodeId(),
 				getNewIterationNodeId(),
 				coIterate.getWaitTime(),
-				coIterate.getParallelism());
+				coIterate.getParallelism(),
+				coIterate.getMaxParallelism());
 
 		StreamNode itSource = itSourceAndSink.f0;
 		StreamNode itSink = itSourceAndSink.f1;
@@ -430,6 +457,7 @@ public class StreamGraphGenerator {
 			streamGraph.setInputFormat(source.getId(), fs.getFormat());
 		}
 		streamGraph.setParallelism(source.getId(), source.getParallelism());
+		streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
 		return Collections.singleton(source.getId());
 	}
 
@@ -450,6 +478,7 @@ public class StreamGraphGenerator {
 				"Sink: " + sink.getName());
 
 		streamGraph.setParallelism(sink.getId(), sink.getParallelism());
+		streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());
 
 		for (Integer inputId: inputIds) {
 			streamGraph.addEdge(inputId,
@@ -498,6 +527,7 @@ public class StreamGraphGenerator {
 		}
 
 		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+		streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
 
 		for (Integer inputId: inputIds) {
 			streamGraph.addEdge(inputId, transform.getId(), 0);
@@ -545,6 +575,7 @@ public class StreamGraphGenerator {
 
 
 		streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+		streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
 
 		for (Integer inputId: inputIds1) {
 			streamGraph.addEdge(inputId,

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 430760b..9051891 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Class representing the operators in the streaming programs, with all their properties.
@@ -43,6 +44,11 @@ public class StreamNode implements Serializable {
 
 	private final int id;
 	private Integer parallelism = null;
+	/**
+	 * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
+	 * dynamic scaling and the number of key groups used for partitioned state.
+	 */
+	private int maxParallelism;
 	private Long bufferTimeout = null;
 	private final String operatorName;
 	private String slotSharingGroup;
@@ -141,6 +147,25 @@ public class StreamNode implements Serializable {
 		this.parallelism = parallelism;
 	}
 
+	/**
+	 * Get the maximum parallelism for this stream node.
+	 *
+	 * @return Maximum parallelism
+	 */
+	int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	/**
+	 * Set the maximum parallelism for this stream node.
+	 *
+	 * @param maxParallelism Maximum parallelism to be set
+	 */
+	void setMaxParallelism(int maxParallelism) {
+		Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be at least 1.");
+		this.maxParallelism = maxParallelism;
+	}
+
 	public Long getBufferTimeout() {
 		return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 28d982c..76fdaca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -289,6 +291,21 @@ public class StreamingJobGraphGenerator {
 
 		if (parallelism > 0) {
 			jobVertex.setParallelism(parallelism);
+		} else {
+			parallelism = jobVertex.getParallelism();
+		}
+
+		int maxParallelism = streamNode.getMaxParallelism();
+
+		// the maximum parallelism specifies the upper bound for the parallelism
+		if (parallelism > maxParallelism) {
+			// the parallelism should always be smaller or equal than the max parallelism
+			throw new IllegalStateException("The maximum parallelism (" + maxParallelism + ") of " +
+				"the stream node " + streamNode + " is smaller than the parallelism (" +
+				parallelism + "). Increase the maximum parallelism or decrease the parallelism of" +
+				"this operator.");
+		} else {
+			jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -325,7 +342,7 @@ public class StreamingJobGraphGenerator {
 		config.setTimeCharacteristic(streamGraph.getEnvironment().getStreamTimeCharacteristic());
 		
 		final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
-		
+
 		config.setStateBackend(streamGraph.getStateBackend());
 		config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
 		if (ceckpointCfg.isCheckpointingEnabled()) {
@@ -339,7 +356,15 @@ public class StreamingJobGraphGenerator {
 		config.setStatePartitioner(0, vertex.getStatePartitioner1());
 		config.setStatePartitioner(1, vertex.getStatePartitioner2());
 		config.setStateKeySerializer(vertex.getStateKeySerializer());
-		
+
+		// only set the key group assigner if the vertex uses partitioned state (= KeyedStream).
+		if (vertex.getStatePartitioner1() != null) {
+			// the key group assigner has to know the number of key groups (= maxParallelism)
+			KeyGroupAssigner<Object> keyGroupAssigner = new HashKeyGroupAssigner<Object>(vertex.getMaxParallelism());
+
+			config.setKeyGroupAssigner(keyGroupAssigner);
+		}
+
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
 
 		if (vertexClass.equals(StreamIterationHead.class)
@@ -725,8 +750,6 @@ public class StreamingJobGraphGenerator {
 		// stream graph.
 		hasher.putInt(id);
 
-		hasher.putInt(node.getParallelism());
-
 		if (node.getOperator() instanceof AbstractUdfStreamOperator) {
 			String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
 					.getUserFunction().getClass().getName();

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1d2a1bb..e674619 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -117,6 +117,12 @@ public abstract class StreamTransformation<T> {
 	private int parallelism;
 
 	/**
+	 * The maximum parallelism for this stream transformation. It defines the upper limit for
+	 * dynamic scaling and the number of key groups used for partitioned state.
+	 */
+	private int maxParallelism = -1;
+
+	/**
 	 * User-specified ID for this transformation. This is used to assign the
 	 * same operator ID across job restarts. There is also the automatically
 	 * generated {@link #id}, which is assigned from a static counter. That
@@ -181,6 +187,24 @@ public abstract class StreamTransformation<T> {
 	}
 
 	/**
+	 * Gets the maximum parallelism for this stream transformation.
+	 *
+	 * @return Maximum parallelism of this transformation.
+	 */
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	/**
+	 * Sets the maximum parallelism for this stream transformation.
+	 *
+	 * @param maxParallelism Maximum parallelism for this stream transformation.
+	 */
+	public void setMaxParallelism(int maxParallelism) {
+		this.maxParallelism = maxParallelism;
+	}
+
+	/**
 	 * Sets an ID for this {@link StreamTransformation}.
 	 *
 	 * <p>The specified ID is used to assign the same operator ID across job

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
new file mode 100644
index 0000000..c59c88a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+/**
+ * Interface for {@link StreamPartitioner} which have to be configured with the maximum parallelism
+ * of the stream transformation. The configure method is called by the StreamGraph when adding
+ * internal edges.
+ *
+ * This interface is required since the stream partitioners are instantiated eagerly. Due to that
+ * the maximum parallelism might not have been determined and needs to be set at a stage when the
+ * maximum parallelism could have been determined.
+ */
+public interface ConfigurableStreamPartitioner {
+
+	/**
+	 * Configure the {@link StreamPartitioner} with the maximum parallelism of the down stream
+	 * operator.
+	 *
+	 * @param maxParallelism Maximum parallelism of the down stream operator.
+	 */
+	void configure(int maxParallelism);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
deleted file mode 100644
index 3c93fb7..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner selects the target channel based on the hash value of a key from a
- * {@link KeySelector}.
- *
- * @param <T> Type of the elements in the Stream being partitioned
- */
-@Internal
-public class HashPartitioner<T> extends StreamPartitioner<T> {
-	private static final long serialVersionUID = 1L;
-
-	private int[] returnArray = new int[1];
-	KeySelector<T, ?> keySelector;
-
-	public HashPartitioner(KeySelector<T, ?> keySelector) {
-		this.keySelector = keySelector;
-	}
-
-	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-		Object key;
-		try {
-			key = keySelector.getKey(record.getInstance().getValue());
-		} catch (Exception e) {
-			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
-		}
-		returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;
-
-		return returnArray;
-	}
-
-	@Override
-	public StreamPartitioner<T> copy() {
-		return this;
-	}
-
-	@Override
-	public String toString() {
-		return "HASH";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
new file mode 100644
index 0000000..5bcf41b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Partitioner selects the target channel based on the key group index. The key group
+ * index is derived from the key of the elements using the {@link KeyGroupAssigner}.
+ *
+ * @param <T> Type of the elements in the Stream being partitioned
+ */
+@Internal
+public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
+	private static final long serialVersionUID = 1L;
+
+	private final int[] returnArray = new int[1];
+
+	private final KeySelector<T, K> keySelector;
+
+	private final KeyGroupAssigner<K> keyGroupAssigner;
+
+	public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, KeyGroupAssigner<K> keyGroupAssigner) {
+		this.keySelector = Preconditions.checkNotNull(keySelector);
+		this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+	}
+
+	public KeyGroupAssigner<K> getKeyGroupAssigner() {
+		return keyGroupAssigner;
+	}
+
+	@Override
+	public int[] selectChannels(
+		SerializationDelegate<StreamRecord<T>> record,
+		int numberOfOutputChannels) {
+
+		K key;
+		try {
+			key = keySelector.getKey(record.getInstance().getValue());
+		} catch (Exception e) {
+			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
+		}
+		returnArray[0] = keyGroupAssigner.getKeyGroupIndex(key) % numberOfOutputChannels;
+
+		return returnArray;
+	}
+
+	@Override
+	public StreamPartitioner<T> copy() {
+		return this;
+	}
+
+	@Override
+	public String toString() {
+		return "HASH";
+	}
+
+	@Override
+	public void configure(int maxParallelism) {
+		keyGroupAssigner.setup(maxParallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index d0617d0..1183306 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -46,7 +46,7 @@ import org.junit.Test;
 public class AggregationFunctionTest {
 
 	@Test
-	public void groupSumIntegerTest() {
+	public void groupSumIntegerTest() throws Exception {
 
 		// preparing expected outputs
 		List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
@@ -115,7 +115,7 @@ public class AggregationFunctionTest {
 	}
 
 	@Test
-	public void pojoGroupSumIntegerTest() {
+	public void pojoGroupSumIntegerTest() throws Exception {
 
 		// preparing expected outputs
 		List<MyPojo> expectedGroupSumList = new ArrayList<>();
@@ -183,7 +183,7 @@ public class AggregationFunctionTest {
 	}
 	
 	@Test
-	public void minMaxByTest() {
+	public void minMaxByTest() throws Exception {
 		// Tuples are grouped on field 0, aggregated on field 1
 		
 		// preparing expected outputs
@@ -250,7 +250,7 @@ public class AggregationFunctionTest {
 	}
 
 	@Test
-	public void pojoMinMaxByTest() {
+	public void pojoMinMaxByTest() throws Exception {
 		// Pojos are grouped on field 0, aggregated on field 1
 
 		// preparing expected outputs