You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/17 22:03:59 UTC

[1/3] flink git commit: [FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Repository: flink
Updated Branches:
  refs/heads/master 20fff32a5 -> f31a55e08


[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f31a55e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f31a55e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f31a55e0

Branch: refs/heads/master
Commit: f31a55e08ddb7b4bc9e47577a187bac31ad42f4b
Parents: e4fbae3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 14:35:37 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 17 21:34:54 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 35 +++++++++----
 .../environment/StreamExecutionEnvironment.java |  8 ---
 .../StreamExecutionEnvironmentTest.java         | 54 ++++++++++++++++++++
 .../api/graph/StreamGraphGeneratorTest.java     |  3 ++
 ...tractEventTimeWindowCheckpointingITCase.java |  2 +-
 5 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f31a55e0/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 9af9cff..c18db52 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -84,8 +84,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	public static final int PARALLELISM_UNKNOWN = -2;
 
 	/**
-	 * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
-	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+	 * The default lower bound for max parallelism if nothing was configured by the user. We have
+	 * this to allow users some degree of scale-up in case they forgot to configure maximum
+	 * parallelism explicitly.
 	 */
 	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
 
@@ -292,13 +293,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		if (parallelism != PARALLELISM_UNKNOWN) {
-			if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
-				throw new IllegalArgumentException(
-					"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
-			}
-			this.parallelism = parallelism;
-		}
+		checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
+		checkArgument(
+				parallelism >= 1 || parallelism == PARALLELISM_DEFAULT,
+				"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " +
+						"(use system default).");
+		checkArgument(
+				maxParallelism == -1 || parallelism <= maxParallelism,
+				"The specified parallelism must be smaller or equal to the maximum parallelism.");
+		checkArgument(
+				maxParallelism == -1 || parallelism != PARALLELISM_DEFAULT,
+				"Default parallelism cannot be specified when maximum parallelism is specified");
+		this.parallelism = parallelism;
 		return this;
 	}
 
@@ -325,7 +331,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 */
 	@PublicEvolving
 	public void setMaxParallelism(int maxParallelism) {
+		checkArgument(
+				parallelism != PARALLELISM_DEFAULT,
+				"A maximum parallelism can only be specified with an explicitly specified " +
+						"parallelism.");
 		checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+		checkArgument(
+				maxParallelism >= parallelism,
+				"The maximum parallelism must be larger than the parallelism.");
+		checkArgument(
+				maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+				"maxParallelism is out of bounds 0 < maxParallelism <= " +
+						UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
 		this.maxParallelism = maxParallelism;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f31a55e0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f443597..70807fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -168,9 +168,6 @@ public abstract class StreamExecutionEnvironment {
 	 * @param parallelism The parallelism
 	 */
 	public StreamExecutionEnvironment setParallelism(int parallelism) {
-		if (parallelism < 1) {
-			throw new IllegalArgumentException("parallelism must be at least one.");
-		}
 		config.setParallelism(parallelism);
 		return this;
 	}
@@ -184,11 +181,6 @@ public abstract class StreamExecutionEnvironment {
 	 * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
 	 */
 	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0 &&
-						maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
-				"maxParallelism is out of bounds 0 < maxParallelism <= " +
-						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
-
 		config.setMaxParallelism(maxParallelism);
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f31a55e0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index d29c833..fd27179 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -36,7 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -52,6 +54,10 @@ import static org.mockito.Mockito.mock;
 
 public class StreamExecutionEnvironmentTest {
 
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
+
 	@Test
 	public void fromElementsWithBaseTypeTest1() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -156,6 +162,53 @@ public class StreamExecutionEnvironmentTest {
 	}
 
 	@Test
+	public void testMaxParallelismMustBeBiggerEqualParallelism() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(10);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setMaxParallelism(5);
+	}
+
+	@Test
+	public void testParallelismMustBeSmallerEqualMaxParallelism() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(10);
+		env.setMaxParallelism(20);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setParallelism(30);
+	}
+
+	@Test
+	public void testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() {
+		final int defaultParallelism = 20;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+		env.setParallelism(10);
+		env.setMaxParallelism(15);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+	}
+
+	@Test
+	public void testSetMaxParallelismNotAllowedWithDefaultParallelism() {
+		final int defaultParallelism = 20;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+		env.setParallelism(10);
+		env.setMaxParallelism(15);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+	}
+
+	@Test
 	public void testParallelismBounds() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -206,6 +259,7 @@ public class StreamExecutionEnvironmentTest {
 		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
 
 		// configured value after generating
+		env.setParallelism(21);
 		env.setMaxParallelism(42);
 		env.getStreamGraph().getJobGraph();
 		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/f31a55e0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 5fdacd4..fbbb5d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -247,6 +247,7 @@ public class StreamGraphGeneratorTest {
 	public void testSetupOfKeyGroupPartitioner() {
 		int maxParallelism = 42;
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setParallelism(12);
 		env.getConfig().setMaxParallelism(maxParallelism);
 
 		DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -278,6 +279,7 @@ public class StreamGraphGeneratorTest {
 		int keyedResult2MaxParallelism = 17;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setParallelism(12);
 		env.getConfig().setMaxParallelism(globalMaxParallelism);
 
 		DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -384,6 +386,7 @@ public class StreamGraphGeneratorTest {
 		DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128);
 		DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129);
 
+		env.setParallelism(12);
 		env.getConfig().setMaxParallelism(maxParallelism);
 
 		DataStream<Integer> keyedResult = input1.connect(input2).keyBy(

http://git-wip-us.apache.org/repos/asf/flink/blob/f31a55e0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 5e966d1..5fc2083 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -298,8 +298,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 					"localhost", cluster.getLeaderRPCPort());
 
-			env.setMaxParallelism(2 * PARALLELISM);
 			env.setParallelism(PARALLELISM);
+			env.setMaxParallelism(2 * PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
 			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));


[2/3] flink git commit: [FLINK-5808] Move max keygroup constants to ExecutionConfig

Posted by al...@apache.org.
[FLINK-5808] Move max keygroup constants to ExecutionConfig

We need to have them there if we want to properly test the arguments of
setMaxParallelism() in the ExecutionConfig itself.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4fbae36
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4fbae36
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4fbae36

Branch: refs/heads/master
Commit: e4fbae36207c563363eed39886c24eea51d7db01
Parents: 9cfae89
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 14:37:26 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 17 21:34:54 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/api/common/ExecutionConfig.java    |  9 +++++++++
 .../runtime/executiongraph/ExecutionJobVertex.java  |  6 +++---
 .../runtime/executiongraph/ExecutionVertex.java     |  4 ++--
 .../runtime/state/KeyGroupRangeAssignment.java      | 16 ++++------------
 .../api/environment/StreamExecutionEnvironment.java |  5 ++---
 .../streaming/api/graph/StreamGraphGenerator.java   |  6 +++---
 6 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1..9af9cff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -83,6 +83,15 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 */
 	public static final int PARALLELISM_UNKNOWN = -2;
 
+	/**
+	 * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
+	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+	 */
+	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+	/** The (inclusive) upper bound for max parallelism */
+	public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
+
 	private static final long DEFAULT_RESTART_DELAY = 10000L;
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 852d530..545315f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -225,13 +225,13 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	private void setMaxParallelismInternal(int maxParallelism) {
 		if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-			maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+			maxParallelism = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
 		}
 
 		Preconditions.checkArgument(maxParallelism > 0
-						&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+						&& maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
 				"Overriding max parallelism is not in valid bounds (1..%s), found: %s",
-				KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
+				ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
 
 		this.maxParallelism = maxParallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 21af73a..9693b97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -663,7 +663,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				//TODO this case only exists for test, currently there has to be exactly one consumer in real jobs!
 				producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
 						partition,
-						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
 						lazyScheduling));
 			} else {
 				Preconditions.checkState(1 == consumers.size(),

http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 62bf3f6..bf0611b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -18,20 +18,12 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 
 public final class KeyGroupRangeAssignment {
 
-	/**
-	 * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
-	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
-	 */
-	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
-
-	/** The (inclusive) upper bound for max parallelism */
-	public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
-
 	private KeyGroupRangeAssignment() {
 		throw new AssertionError();
 	}
@@ -130,13 +122,13 @@ public final class KeyGroupRangeAssignment {
 		return Math.min(
 				Math.max(
 						MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
-						DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
-				UPPER_BOUND_MAX_PARALLELISM);
+						ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+				ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM);
 	}
 
 	public static void checkParallelismPreconditions(int parallelism) {
 		Preconditions.checkArgument(parallelism > 0
-						&& parallelism <= UPPER_BOUND_MAX_PARALLELISM,
+						&& parallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
 				"Operator parallelism not within bounds: " + parallelism);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ac3eadb..f443597 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -49,7 +49,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -186,9 +185,9 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
 		Preconditions.checkArgument(maxParallelism > 0 &&
-						maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+						maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
 				"maxParallelism is out of bounds 0 < maxParallelism <= " +
-						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
 
 		config.setMaxParallelism(maxParallelism);
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index b3b6529..2defbef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -78,8 +78,8 @@ public class StreamGraphGenerator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
 
-	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
-	public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+	public static final int UPPER_BOUND_MAX_PARALLELISM = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
 
 	// The StreamGraph that is being built, this is initialized at the beginning.
 	private final StreamGraph streamGraph;


[3/3] flink git commit: [FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

Posted by al...@apache.org.
[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

Before, it was set on the ExecutionConfig for some stream execution
environments and later for others. Now, we don't set the default
parallelism on the ExecutionConfig but instead set it at the latest
possible point, in the StreamingJobGraphGenerator.

This also adds tests that verify that we don't set the default
parallelism on the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cfae899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cfae899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cfae899

Branch: refs/heads/master
Commit: 9cfae899358e0694c3ecedae1fad20e428a1d359
Parents: 20fff32
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 13:30:21 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 17 21:34:54 2017 +0100

----------------------------------------------------------------------
 .../Flip6LocalStreamEnvironment.java            |   4 +
 .../api/environment/LocalStreamEnvironment.java |  26 +-
 .../environment/RemoteStreamEnvironment.java    |   5 +
 .../environment/StreamContextEnvironment.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |  65 ++--
 .../api/environment/StreamPlanEnvironment.java  |  15 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +-
 .../api/graph/StreamGraphGenerator.java         |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../api/StreamExecutionEnvironmentTest.java     | 289 -----------------
 .../StreamExecutionEnvironmentTest.java         | 317 +++++++++++++++++++
 .../graph/StreamingJobGraphGeneratorTest.java   |  10 +-
 .../FoldApplyProcessWindowFunctionTest.java     |   8 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  28 +-
 .../streaming/api/scala/DataStreamTest.scala    |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |   1 +
 .../accumulators/AccumulatorLiveITCase.java     |   4 +
 18 files changed, 437 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 63dc35d..4a5f20d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -46,6 +46,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
 
+	/** The default parallelism used when creating a local environment */
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
+
 	/** The configuration to use for the mini cluster */
 	private final Configuration conf;
 
@@ -62,6 +65,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public Flip6LocalStreamEnvironment(Configuration config) {
+		super(defaultLocalParallelism);
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index f8c9c42..cb60552 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -45,6 +45,9 @@ import org.slf4j.LoggerFactory;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
+	/** The default parallelism used when creating a local environment */
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
+
 	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
 	
 	/** The configuration to use for the local cluster */
@@ -54,24 +57,43 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * Creates a new local stream environment that uses the default configuration.
 	 */
 	public LocalStreamEnvironment() {
-		this(null);
+		this(defaultLocalParallelism);
 	}
 
 	/**
+	 * Creates a new local stream environment that uses the default configuration.
+	 */
+	public LocalStreamEnvironment(int parallelism) {
+		this(null, parallelism);
+	}
+
+
+	/**
 	 * Creates a new local stream environment that configures its local executor with the given configuration.
 	 *
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public LocalStreamEnvironment(Configuration config) {
+		this(config, defaultLocalParallelism);
+	}
+
+	/**
+	 * Creates a new local stream environment that configures its local executor with the given configuration.
+	 *
+	 * @param config The configuration used to configure the local executor.
+	 */
+	public LocalStreamEnvironment(Configuration config, int parallelism) {
+		super(parallelism);
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
 							"or running in a TestEnvironment context.");
 		}
-		
+
 		this.conf = config == null ? new Configuration() : config;
 	}
 
+
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
 	 * specified name.

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 333f9c0..5684e28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -37,6 +37,7 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +130,10 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
 	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
+		super(GlobalConfiguration.loadConfiguration().getInteger(
+				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				ConfigConstants.DEFAULT_PARALLELISM));
+		
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 49c5347..51078f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,14 +38,13 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	private final ContextEnvironment ctx;
 
 	protected StreamContextEnvironment(ContextEnvironment ctx) {
+		// if the batch ContextEnvironment has a parallelism this must have come from
+		// the CLI Client. We should set that as our default parallelism
+		super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
+				GlobalConfiguration.loadConfiguration().getInteger(
+						ConfigConstants.DEFAULT_PARALLELISM_KEY,
+						ConfigConstants.DEFAULT_PARALLELISM));
 		this.ctx = ctx;
-		if (ctx.getParallelism() > 0) {
-			setParallelism(ctx.getParallelism());
-		} else {
-			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
-					ConfigConstants.DEFAULT_PARALLELISM));
-		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e299e84..ac3eadb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -112,9 +112,6 @@ public abstract class StreamExecutionEnvironment {
 	/** The environment of the context (local by default, cluster if invoked through command line) */
 	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
 
-	/** The default parallelism used when creating a local environment */
-	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
-
 	// ------------------------------------------------------------------------
 
 	/** The execution configuration for this environment */
@@ -135,11 +132,23 @@ public abstract class StreamExecutionEnvironment {
 	/** The time characteristic used by the data streams */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
+	/** The parallelism to use when no parallelism is set on an operation. */
+	private final int defaultParallelism;
+
 
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
 
+
+	public StreamExecutionEnvironment() {
+		this(ConfigConstants.DEFAULT_PARALLELISM);
+	}
+
+	public StreamExecutionEnvironment(int defaultParallelism) {
+		this.defaultParallelism = defaultParallelism;
+	}
+
 	/**
 	 * Gets the config object.
 	 */
@@ -1513,7 +1522,7 @@ public abstract class StreamExecutionEnvironment {
 		if (transformations.size() <= 0) {
 			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
 		}
-		return StreamGraphGenerator.generate(this, transformations);
+		return StreamGraphGenerator.generate(this, transformations, defaultParallelism);
 	}
 
 	/**
@@ -1601,7 +1610,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A local execution environment.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment() {
-		return createLocalEnvironment(defaultLocalParallelism);
+		return new LocalStreamEnvironment();
 	}
 
 	/**
@@ -1610,14 +1619,12 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
 	 *
-	 * @param parallelism
-	 * 		The parallelism for the local environment.
+	 * @param defaultParallelism The default parallelism for the local environment.
+	 * 
 	 * @return A local execution environment with the specified parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
-		LocalStreamEnvironment env = new LocalStreamEnvironment();
-		env.setParallelism(parallelism);
-		return env;
+	public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) {
+		return new LocalStreamEnvironment(defaultParallelism);
 	}
 
 	/**
@@ -1626,16 +1633,13 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
 	 *
-	 * @param parallelism
-	 * 		The parallelism for the local environment.
-	 * 	@param configuration
-	 * 		Pass a custom configuration into the cluster
+	 * @param defaultParallelism The parallelism for the local environment.
+	 * @param configuration Pass a custom configuration into the cluster
+	 *
 	 * @return A local execution environment with the specified parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
-		LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
-		currentEnvironment.setParallelism(parallelism);
-		return currentEnvironment;
+	public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) {
+		return new LocalStreamEnvironment(configuration, defaultParallelism);
 	}
 
 	/**
@@ -1660,7 +1664,6 @@ public abstract class StreamExecutionEnvironment {
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
-		localEnv.setParallelism(defaultLocalParallelism);
 
 		return localEnv;
 	}
@@ -1746,28 +1749,6 @@ public abstract class StreamExecutionEnvironment {
 		return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
 	}
 
-	/**
-	 * Gets the default parallelism that will be used for the local execution environment created by
-	 * {@link #createLocalEnvironment()}.
-	 *
-	 * @return The default local parallelism
-	 */
-	@PublicEvolving
-	public static int getDefaultLocalParallelism() {
-		return defaultLocalParallelism;
-	}
-
-	/**
-	 * Sets the default parallelism that will be used for the local execution
-	 * environment created by {@link #createLocalEnvironment()}.
-	 *
-	 * @param parallelism The parallelism to use as the default local parallelism.
-	 */
-	@PublicEvolving
-	public static void setDefaultLocalParallelism(int parallelism) {
-		defaultLocalParallelism = parallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Methods to control the context and local environments for execution from packaged programs
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index b1521f5..9c676c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,18 +32,11 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 	private ExecutionEnvironment env;
 
 	protected StreamPlanEnvironment(ExecutionEnvironment env) {
-		super();
-		this.env = env;
+		super(GlobalConfiguration.loadConfiguration().getInteger(
+				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				ConfigConstants.DEFAULT_PARALLELISM));
 
-		int parallelism = env.getParallelism();
-		if (parallelism > 0) {
-			setParallelism(parallelism);
-		} else {
-			// determine parallelism
-			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
-					ConfigConstants.DEFAULT_PARALLELISM));
-		}
+		this.env = env;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index c1775e4..c8d5340 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -92,12 +92,14 @@ public class StreamGraph extends StreamingPlan {
 	private AbstractStateBackend stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
+	private final int defaultParallelism;
 
-	public StreamGraph(StreamExecutionEnvironment environment) {
+	public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) {
 		this.environment = environment;
 		this.executionConfig = environment.getConfig();
 		this.checkpointConfig = environment.getCheckpointConfig();
 
+		this.defaultParallelism = defaultParallelism;
 		// create an empty new stream graph.
 		clear();
 	}
@@ -607,7 +609,7 @@ public class StreamGraph extends StreamingPlan {
 							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
 		}
 
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
+		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism);
 
 		return jobgraphGenerator.createJobGraph();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f4d4071..b3b6529 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -97,12 +97,11 @@ public class StreamGraphGenerator {
 	// we have loops, i.e. feedback edges.
 	private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
 
-
 	/**
 	 * Private constructor. The generator should only be invoked using {@link #generate}.
 	 */
-	private StreamGraphGenerator(StreamExecutionEnvironment env) {
-		this.streamGraph = new StreamGraph(env);
+	private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) {
+		this.streamGraph = new StreamGraph(env, defaultParallelism);
 		this.streamGraph.setChaining(env.isChainingEnabled());
 		this.streamGraph.setStateBackend(env.getStateBackend());
 		this.env = env;
@@ -119,8 +118,11 @@ public class StreamGraphGenerator {
 	 *
 	 * @return The generated {@code StreamGraph}
 	 */
-	public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
-		return new StreamGraphGenerator(env).generateInternal(transformations);
+	public static StreamGraph generate(
+			StreamExecutionEnvironment env,
+			List<StreamTransformation<?>> transformations,
+			int defaultParallelism) {
+		return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5c1e1ac..60f8faa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -92,10 +93,13 @@ public class StreamingJobGraphGenerator {
 	private final StreamGraphHasher defaultStreamGraphHasher;
 	private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+	private final int defaultParallelism;
+
+	public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
+		this.defaultParallelism = defaultParallelism;
 	}
 
 	private void init() {
@@ -338,12 +342,12 @@ public class StreamingJobGraphGenerator {
 
 		int parallelism = streamNode.getParallelism();
 
-		if (parallelism > 0) {
-			jobVertex.setParallelism(parallelism);
-		} else {
-			parallelism = jobVertex.getParallelism();
+		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+			parallelism = defaultParallelism;
 		}
 
+		jobVertex.setParallelism(parallelism);
+
 		jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index 3fc1344..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class StreamExecutionEnvironmentTest {
-
-	@Test
-	public void fromElementsWithBaseTypeTest1() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void fromElementsWithBaseTypeTest2() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testFromCollectionParallelism() {
-		try {
-			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-			try {
-				dataStream1.setParallelism(4);
-				fail("should throw an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			dataStream1.addSink(new DiscardingSink<Integer>());
-	
-			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-					typeInfo).setParallelism(4);
-
-			dataStream2.addSink(new DiscardingSink<Integer>());
-
-			env.getExecutionPlan();
-
-			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-			assertEquals("Parallelism of parallel collection source must be 4.",
-					4, 
-					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSources() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-		DataStreamSource<Integer> src1 = env.addSource(srcFun);
-		src1.addSink(new DiscardingSink<Integer>());
-		assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-		List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
-
-		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
-
-		DataStreamSource<Long> src4 = env.fromCollection(list);
-		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
-	}
-
-	@Test
-	public void testParallelismBounds() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-
-
-		SingleOutputStreamOperator<Object> operator =
-				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap(Integer value, Collector<Object> out) throws Exception {
-
-			}
-		});
-
-		// default value for max parallelism
-		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
-
-		// bounds for parallelism 1
-		try {
-			operator.setParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for parallelism 2
-		operator.setParallelism(1);
-		Assert.assertEquals(1, operator.getParallelism());
-
-		// bounds for parallelism 3
-		operator.setParallelism(1 << 15);
-		Assert.assertEquals(1 << 15, operator.getParallelism());
-
-		// default value after generating
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
-
-		// configured value after generating
-		env.setMaxParallelism(42);
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
-
-		// bounds configured parallelism 1
-		try {
-			env.setMaxParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds configured parallelism 2
-		try {
-			env.setMaxParallelism(1 + (1 << 15));
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 1
-		try {
-			operator.setMaxParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 2
-		try {
-			operator.setMaxParallelism(1 + (1 << 15));
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 3
-		operator.setMaxParallelism(1);
-		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
-
-		// bounds for max parallelism 4
-		operator.setMaxParallelism(1 << 15);
-		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
-
-		// override config
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
-	}
-
-	/////////////////////////////////////////////////////////////
-	// Utilities
-	/////////////////////////////////////////////////////////////
-
-
-	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
-		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
-		StreamGraph streamGraph = env.getStreamGraph();
-		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-		dataStreamSource.addSink(new DiscardingSink<T>());
-		AbstractUdfStreamOperator<?, ?> operator =
-				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
-		return (SourceFunction<T>) operator.getUserFunction();
-	}
-
-	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
-		private static final long serialVersionUID = 1312752876092210499L;
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Iterator<T>[] split(int numPartitions) {
-			return (Iterator<T>[]) new Iterator<?>[0];
-		}
-
-		@Override
-		public int getMaximumNumberOfSplits() {
-			return 0;
-		}
-
-		@Override
-		public boolean hasNext() {
-			return false;
-		}
-
-		@Override
-		public T next() {
-			throw new NoSuchElementException();
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	public static class ParentClass {
-		int num;
-		String string;
-		public ParentClass(int num, String string) {
-			this.num = num;
-			this.string = string;
-		}
-	}
-
-	public static class SubClass extends ParentClass{
-		public SubClass(int num, String string) {
-			super(num, string);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..d29c833
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class StreamExecutionEnvironmentTest {
+
+	@Test
+	public void fromElementsWithBaseTypeTest1() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void fromElementsWithBaseTypeTest2() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFromCollectionParallelism() {
+		try {
+			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+
+			try {
+				dataStream1.setParallelism(4);
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			dataStream1.addSink(new DiscardingSink<Integer>());
+	
+			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+					typeInfo).setParallelism(4);
+
+			dataStream2.addSink(new DiscardingSink<Integer>());
+
+			env.getExecutionPlan();
+
+			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+			assertEquals("Parallelism of parallel collection source must be 4.",
+					4, 
+					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSources() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+		DataStreamSource<Integer> src1 = env.addSource(srcFun);
+		src1.addSink(new DiscardingSink<Integer>());
+		assertEquals(srcFun, getFunctionFromDataSource(src1));
+
+		List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
+
+		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
+
+		DataStreamSource<Long> src4 = env.fromCollection(list);
+		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
+	}
+
+	@Test
+	public void testDefaultParallelismIsDefault() {
+		assertEquals(
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				StreamExecutionEnvironment.createLocalEnvironment().getParallelism());
+
+		assertEquals(
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				StreamExecutionEnvironment.createRemoteEnvironment("dummy", 1234).getParallelism());
+
+		StreamExecutionEnvironment contextEnv = new StreamContextEnvironment(
+				new ContextEnvironment(
+						mock(ClusterClient.class),
+						Collections.<URL>emptyList(),
+						Collections.<URL>emptyList(),
+						this.getClass().getClassLoader(),
+						null));
+
+		assertEquals(
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				contextEnv.getParallelism());
+	}
+
+	@Test
+	public void testParallelismBounds() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+
+
+		SingleOutputStreamOperator<Object> operator =
+				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(Integer value, Collector<Object> out) throws Exception {
+
+			}
+		});
+
+		// default value for max parallelism
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for parallelism 1
+		try {
+			operator.setParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for parallelism 2
+		operator.setParallelism(1);
+		Assert.assertEquals(1, operator.getParallelism());
+
+		// bounds for parallelism 3
+		operator.setParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getParallelism());
+
+		// default value after generating
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// configured value after generating
+		env.setMaxParallelism(42);
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
+
+		// bounds configured parallelism 1
+		try {
+			env.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds configured parallelism 2
+		try {
+			env.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 1
+		try {
+			operator.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 2
+		try {
+			operator.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 3
+		operator.setMaxParallelism(1);
+		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for max parallelism 4
+		operator.setMaxParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
+
+		// override config
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
+	}
+
+	/////////////////////////////////////////////////////////////
+	// Utilities
+	/////////////////////////////////////////////////////////////
+
+
+	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
+		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+		StreamGraph streamGraph = env.getStreamGraph();
+		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+		dataStreamSource.addSink(new DiscardingSink<T>());
+		AbstractUdfStreamOperator<?, ?> operator =
+				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
+		return (SourceFunction<T>) operator.getUserFunction();
+	}
+
+	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
+		private static final long serialVersionUID = 1312752876092210499L;
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Iterator<T>[] split(int numPartitions) {
+			return (Iterator<T>[]) new Iterator<?>[0];
+		}
+
+		@Override
+		public int getMaximumNumberOfSplits() {
+			return 0;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return false;
+		}
+
+		@Override
+		public T next() {
+			throw new NoSuchElementException();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	public static class ParentClass {
+		int num;
+		String string;
+		public ParentClass(int num, String string) {
+			this.num = num;
+			this.string = string;
+		}
+	}
+
+	public static class SubClass extends ParentClass{
+		public SubClass(int num, String string) {
+			super(num, string);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6d2fcaa..abf51ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -112,10 +112,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testDisabledCheckpointing() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamGraph streamGraph = new StreamGraph(env);
+		StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */);
 		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
+		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
 		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
 		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
@@ -137,7 +137,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 				}
 			})
 			.print();
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
 
 		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +224,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		});
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
 
 		JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 		JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +291,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		}).disableChaining().name("test_sink");
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
 
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			if (jobVertex.getName().contains("test_source")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 734879d..4b479f3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -123,7 +123,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -218,7 +218,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -246,6 +246,10 @@ public class FoldApplyProcessWindowFunctionTest {
 
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
+		public DummyStreamExecutionEnvironment() {
+			super(1);
+		}
+
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index fecd440..6ddca34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -117,7 +117,7 @@ public class FoldApplyWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -140,6 +140,10 @@ public class FoldApplyWindowFunctionTest {
 
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
+		public DummyStreamExecutionEnvironment() {
+			super(1);
+		}
+
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 22f1264..60798e0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -673,23 +673,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
 object StreamExecutionEnvironment {
 
-  /**
-   * Sets the default parallelism that will be used for the local execution
-   * environment created by [[createLocalEnvironment()]].
-   *
-   * @param parallelism The default parallelism to use for local execution.
-   */
-  @PublicEvolving
-  def setDefaultLocalParallelism(parallelism: Int) : Unit =
-    JavaEnv.setDefaultLocalParallelism(parallelism)
-
-  /**
-   * Gets the default parallelism that will be used for the local execution environment created by
-   * [[createLocalEnvironment()]].
-   */
-  @PublicEvolving
-  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
-  
   // --------------------------------------------------------------------------
   //  context environment
   // --------------------------------------------------------------------------
@@ -711,13 +694,14 @@ object StreamExecutionEnvironment {
   /**
    * Creates a local execution environment. The local execution environment will run the
    * program in a multi-threaded fashion in the same JVM as the environment was created in.
-   *
-   * This method sets the environment's default parallelism to given parameter, which
-   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
+  def createLocalEnvironment(parallelism: Int = -1):
       StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
+    if (parallelism == -1) {
+      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment())
+    } else {
+      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 60c609d..08153be 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -255,9 +255,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    // default parallelism is only actualized when transforming to JobGraph
+    assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(-1 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     try {
       src.setParallelism(3)
@@ -272,9 +273,11 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     // the parallelism does not change since some windowing code takes the parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    // setting a parallelism on the env/in the ExecutionConfig means that operators
+    // pick it up when being instantiated
+    assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(7 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 64c68dc..90d8790 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -36,6 +36,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 
 	public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
+		super(parallelism);
 		this.executor = Preconditions.checkNotNull(executor);
 		setParallelism(parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index c56fa91..883f4b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -381,6 +381,10 @@ public class AccumulatorLiveITCase {
 	 */
 	private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
+		public DummyStreamExecutionEnvironment() {
+			super(1 /* default parallelism */);
+		}
+
 		@Override
 		public JobExecutionResult execute() throws Exception {
 			return execute("default");