You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/17 22:04:01 UTC

[3/3] flink git commit: [FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

Before, it was set on the ExecutionConfig for some stream execution
environments and later for others. Now, we don't set the default
parallelism on the ExecutionConfig but instead set it at the latest
possible point, in the StreamingJobGraphGenerator.

This also adds tests that verify that we don't set the default
parallelism on the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9cfae899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9cfae899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9cfae899

Branch: refs/heads/master
Commit: 9cfae899358e0694c3ecedae1fad20e428a1d359
Parents: 20fff32
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 13:30:21 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 17 21:34:54 2017 +0100

----------------------------------------------------------------------
 .../Flip6LocalStreamEnvironment.java            |   4 +
 .../api/environment/LocalStreamEnvironment.java |  26 +-
 .../environment/RemoteStreamEnvironment.java    |   5 +
 .../environment/StreamContextEnvironment.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |  65 ++--
 .../api/environment/StreamPlanEnvironment.java  |  15 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +-
 .../api/graph/StreamGraphGenerator.java         |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../api/StreamExecutionEnvironmentTest.java     | 289 -----------------
 .../StreamExecutionEnvironmentTest.java         | 317 +++++++++++++++++++
 .../graph/StreamingJobGraphGeneratorTest.java   |  10 +-
 .../FoldApplyProcessWindowFunctionTest.java     |   8 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  28 +-
 .../streaming/api/scala/DataStreamTest.scala    |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |   1 +
 .../accumulators/AccumulatorLiveITCase.java     |   4 +
 18 files changed, 437 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 63dc35d..4a5f20d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -46,6 +46,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
 
+	/** The default parallelism used when creating a local environment */
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
+
 	/** The configuration to use for the mini cluster */
 	private final Configuration conf;
 
@@ -62,6 +65,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public Flip6LocalStreamEnvironment(Configuration config) {
+		super(defaultLocalParallelism);
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index f8c9c42..cb60552 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -45,6 +45,9 @@ import org.slf4j.LoggerFactory;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
+	/** The default parallelism used when creating a local environment */
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
+
 	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
 	
 	/** The configuration to use for the local cluster */
@@ -54,24 +57,43 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * Creates a new local stream environment that uses the default configuration.
 	 */
 	public LocalStreamEnvironment() {
-		this(null);
+		this(defaultLocalParallelism);
 	}
 
 	/**
+	 * Creates a new local stream environment that uses the default configuration.
+	 */
+	public LocalStreamEnvironment(int parallelism) {
+		this(null, parallelism);
+	}
+
+
+	/**
 	 * Creates a new local stream environment that configures its local executor with the given configuration.
 	 *
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public LocalStreamEnvironment(Configuration config) {
+		this(config, defaultLocalParallelism);
+	}
+
+	/**
+	 * Creates a new local stream environment that configures its local executor with the given configuration.
+	 *
+	 * @param config The configuration used to configure the local executor.
+	 */
+	public LocalStreamEnvironment(Configuration config, int parallelism) {
+		super(parallelism);
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The LocalStreamEnvironment cannot be used when submitting a program through a client, " +
 							"or running in a TestEnvironment context.");
 		}
-		
+
 		this.conf = config == null ? new Configuration() : config;
 	}
 
+
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
 	 * specified name.

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 333f9c0..5684e28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -37,6 +37,7 @@ import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +130,10 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	 *            The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
 	public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
+		super(GlobalConfiguration.loadConfiguration().getInteger(
+				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				ConfigConstants.DEFAULT_PARALLELISM));
+		
 		if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
 			throw new InvalidProgramException(
 					"The RemoteEnvironment cannot be used when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 49c5347..51078f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,14 +38,13 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	private final ContextEnvironment ctx;
 
 	protected StreamContextEnvironment(ContextEnvironment ctx) {
+		// if the batch ContextEnvironment has a parallelism this must have come from
+		// the CLI Client. We should set that as our default parallelism
+		super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
+				GlobalConfiguration.loadConfiguration().getInteger(
+						ConfigConstants.DEFAULT_PARALLELISM_KEY,
+						ConfigConstants.DEFAULT_PARALLELISM));
 		this.ctx = ctx;
-		if (ctx.getParallelism() > 0) {
-			setParallelism(ctx.getParallelism());
-		} else {
-			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
-					ConfigConstants.DEFAULT_PARALLELISM));
-		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e299e84..ac3eadb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -112,9 +112,6 @@ public abstract class StreamExecutionEnvironment {
 	/** The environment of the context (local by default, cluster if invoked through command line) */
 	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
 
-	/** The default parallelism used when creating a local environment */
-	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
-
 	// ------------------------------------------------------------------------
 
 	/** The execution configuration for this environment */
@@ -135,11 +132,23 @@ public abstract class StreamExecutionEnvironment {
 	/** The time characteristic used by the data streams */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
+	/** The parallelism to use when no parallelism is set on an operation. */
+	private final int defaultParallelism;
+
 
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
 
+
+	public StreamExecutionEnvironment() {
+		this(ConfigConstants.DEFAULT_PARALLELISM);
+	}
+
+	public StreamExecutionEnvironment(int defaultParallelism) {
+		this.defaultParallelism = defaultParallelism;
+	}
+
 	/**
 	 * Gets the config object.
 	 */
@@ -1513,7 +1522,7 @@ public abstract class StreamExecutionEnvironment {
 		if (transformations.size() <= 0) {
 			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
 		}
-		return StreamGraphGenerator.generate(this, transformations);
+		return StreamGraphGenerator.generate(this, transformations, defaultParallelism);
 	}
 
 	/**
@@ -1601,7 +1610,7 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A local execution environment.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment() {
-		return createLocalEnvironment(defaultLocalParallelism);
+		return new LocalStreamEnvironment();
 	}
 
 	/**
@@ -1610,14 +1619,12 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
 	 *
-	 * @param parallelism
-	 * 		The parallelism for the local environment.
+	 * @param defaultParallelism The default parallelism for the local environment.
+	 * 
 	 * @return A local execution environment with the specified parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
-		LocalStreamEnvironment env = new LocalStreamEnvironment();
-		env.setParallelism(parallelism);
-		return env;
+	public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) {
+		return new LocalStreamEnvironment(defaultParallelism);
 	}
 
 	/**
@@ -1626,16 +1633,13 @@ public abstract class StreamExecutionEnvironment {
 	 * environment was created in. It will use the parallelism specified in the
 	 * parameter.
 	 *
-	 * @param parallelism
-	 * 		The parallelism for the local environment.
-	 * 	@param configuration
-	 * 		Pass a custom configuration into the cluster
+	 * @param defaultParallelism The parallelism for the local environment.
+	 * @param configuration Pass a custom configuration into the cluster
+	 *
 	 * @return A local execution environment with the specified parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
-		LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration);
-		currentEnvironment.setParallelism(parallelism);
-		return currentEnvironment;
+	public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) {
+		return new LocalStreamEnvironment(configuration, defaultParallelism);
 	}
 
 	/**
@@ -1660,7 +1664,6 @@ public abstract class StreamExecutionEnvironment {
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);
-		localEnv.setParallelism(defaultLocalParallelism);
 
 		return localEnv;
 	}
@@ -1746,28 +1749,6 @@ public abstract class StreamExecutionEnvironment {
 		return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
 	}
 
-	/**
-	 * Gets the default parallelism that will be used for the local execution environment created by
-	 * {@link #createLocalEnvironment()}.
-	 *
-	 * @return The default local parallelism
-	 */
-	@PublicEvolving
-	public static int getDefaultLocalParallelism() {
-		return defaultLocalParallelism;
-	}
-
-	/**
-	 * Sets the default parallelism that will be used for the local execution
-	 * environment created by {@link #createLocalEnvironment()}.
-	 *
-	 * @param parallelism The parallelism to use as the default local parallelism.
-	 */
-	@PublicEvolving
-	public static void setDefaultLocalParallelism(int parallelism) {
-		defaultLocalParallelism = parallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Methods to control the context and local environments for execution from packaged programs
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index b1521f5..9c676c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,18 +32,11 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 	private ExecutionEnvironment env;
 
 	protected StreamPlanEnvironment(ExecutionEnvironment env) {
-		super();
-		this.env = env;
+		super(GlobalConfiguration.loadConfiguration().getInteger(
+				ConfigConstants.DEFAULT_PARALLELISM_KEY,
+				ConfigConstants.DEFAULT_PARALLELISM));
 
-		int parallelism = env.getParallelism();
-		if (parallelism > 0) {
-			setParallelism(parallelism);
-		} else {
-			// determine parallelism
-			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
-					ConfigConstants.DEFAULT_PARALLELISM));
-		}
+		this.env = env;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index c1775e4..c8d5340 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -92,12 +92,14 @@ public class StreamGraph extends StreamingPlan {
 	private AbstractStateBackend stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
+	private final int defaultParallelism;
 
-	public StreamGraph(StreamExecutionEnvironment environment) {
+	public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) {
 		this.environment = environment;
 		this.executionConfig = environment.getConfig();
 		this.checkpointConfig = environment.getCheckpointConfig();
 
+		this.defaultParallelism = defaultParallelism;
 		// create an empty new stream graph.
 		clear();
 	}
@@ -607,7 +609,7 @@ public class StreamGraph extends StreamingPlan {
 							+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
 		}
 
-		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
+		StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism);
 
 		return jobgraphGenerator.createJobGraph();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f4d4071..b3b6529 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -97,12 +97,11 @@ public class StreamGraphGenerator {
 	// we have loops, i.e. feedback edges.
 	private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
 
-
 	/**
 	 * Private constructor. The generator should only be invoked using {@link #generate}.
 	 */
-	private StreamGraphGenerator(StreamExecutionEnvironment env) {
-		this.streamGraph = new StreamGraph(env);
+	private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) {
+		this.streamGraph = new StreamGraph(env, defaultParallelism);
 		this.streamGraph.setChaining(env.isChainingEnabled());
 		this.streamGraph.setStateBackend(env.getStateBackend());
 		this.env = env;
@@ -119,8 +118,11 @@ public class StreamGraphGenerator {
 	 *
 	 * @return The generated {@code StreamGraph}
 	 */
-	public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
-		return new StreamGraphGenerator(env).generateInternal(transformations);
+	public static StreamGraph generate(
+			StreamExecutionEnvironment env,
+			List<StreamTransformation<?>> transformations,
+			int defaultParallelism) {
+		return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5c1e1ac..60f8faa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -92,10 +93,13 @@ public class StreamingJobGraphGenerator {
 	private final StreamGraphHasher defaultStreamGraphHasher;
 	private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-	public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+	private final int defaultParallelism;
+
+	public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) {
 		this.streamGraph = streamGraph;
 		this.defaultStreamGraphHasher = new StreamGraphHasherV2();
 		this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
+		this.defaultParallelism = defaultParallelism;
 	}
 
 	private void init() {
@@ -338,12 +342,12 @@ public class StreamingJobGraphGenerator {
 
 		int parallelism = streamNode.getParallelism();
 
-		if (parallelism > 0) {
-			jobVertex.setParallelism(parallelism);
-		} else {
-			parallelism = jobVertex.getParallelism();
+		if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+			parallelism = defaultParallelism;
 		}
 
+		jobVertex.setParallelism(parallelism);
+
 		jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index 3fc1344..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class StreamExecutionEnvironmentTest {
-
-	@Test
-	public void fromElementsWithBaseTypeTest1() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void fromElementsWithBaseTypeTest2() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testFromCollectionParallelism() {
-		try {
-			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-			try {
-				dataStream1.setParallelism(4);
-				fail("should throw an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			dataStream1.addSink(new DiscardingSink<Integer>());
-	
-			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-					typeInfo).setParallelism(4);
-
-			dataStream2.addSink(new DiscardingSink<Integer>());
-
-			env.getExecutionPlan();
-
-			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-			assertEquals("Parallelism of parallel collection source must be 4.",
-					4, 
-					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSources() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-		DataStreamSource<Integer> src1 = env.addSource(srcFun);
-		src1.addSink(new DiscardingSink<Integer>());
-		assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-		List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
-
-		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
-
-		DataStreamSource<Long> src4 = env.fromCollection(list);
-		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
-	}
-
-	@Test
-	public void testParallelismBounds() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-			}
-
-			@Override
-			public void cancel() {
-			}
-		};
-
-
-		SingleOutputStreamOperator<Object> operator =
-				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap(Integer value, Collector<Object> out) throws Exception {
-
-			}
-		});
-
-		// default value for max parallelism
-		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
-
-		// bounds for parallelism 1
-		try {
-			operator.setParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for parallelism 2
-		operator.setParallelism(1);
-		Assert.assertEquals(1, operator.getParallelism());
-
-		// bounds for parallelism 3
-		operator.setParallelism(1 << 15);
-		Assert.assertEquals(1 << 15, operator.getParallelism());
-
-		// default value after generating
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
-
-		// configured value after generating
-		env.setMaxParallelism(42);
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
-
-		// bounds configured parallelism 1
-		try {
-			env.setMaxParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds configured parallelism 2
-		try {
-			env.setMaxParallelism(1 + (1 << 15));
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 1
-		try {
-			operator.setMaxParallelism(0);
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 2
-		try {
-			operator.setMaxParallelism(1 + (1 << 15));
-			Assert.fail();
-		} catch (IllegalArgumentException expected) {
-		}
-
-		// bounds for max parallelism 3
-		operator.setMaxParallelism(1);
-		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
-
-		// bounds for max parallelism 4
-		operator.setMaxParallelism(1 << 15);
-		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
-
-		// override config
-		env.getStreamGraph().getJobGraph();
-		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
-	}
-
-	/////////////////////////////////////////////////////////////
-	// Utilities
-	/////////////////////////////////////////////////////////////
-
-
-	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
-		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
-		StreamGraph streamGraph = env.getStreamGraph();
-		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-		dataStreamSource.addSink(new DiscardingSink<T>());
-		AbstractUdfStreamOperator<?, ?> operator =
-				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
-		return (SourceFunction<T>) operator.getUserFunction();
-	}
-
-	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
-		private static final long serialVersionUID = 1312752876092210499L;
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Iterator<T>[] split(int numPartitions) {
-			return (Iterator<T>[]) new Iterator<?>[0];
-		}
-
-		@Override
-		public int getMaximumNumberOfSplits() {
-			return 0;
-		}
-
-		@Override
-		public boolean hasNext() {
-			return false;
-		}
-
-		@Override
-		public T next() {
-			throw new NoSuchElementException();
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	public static class ParentClass {
-		int num;
-		String string;
-		public ParentClass(int num, String string) {
-			this.num = num;
-			this.string = string;
-		}
-	}
-
-	public static class SubClass extends ParentClass{
-		public SubClass(int num, String string) {
-			super(num, string);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..d29c833
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class StreamExecutionEnvironmentTest {
+
+	@Test
+	public void fromElementsWithBaseTypeTest1() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void fromElementsWithBaseTypeTest2() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFromCollectionParallelism() {
+		try {
+			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+
+			try {
+				dataStream1.setParallelism(4);
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+
+			dataStream1.addSink(new DiscardingSink<Integer>());
+	
+			DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+					typeInfo).setParallelism(4);
+
+			dataStream2.addSink(new DiscardingSink<Integer>());
+
+			env.getExecutionPlan();
+
+			assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+			assertEquals("Parallelism of parallel collection source must be 4.",
+					4, 
+					env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSources() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+		DataStreamSource<Integer> src1 = env.addSource(srcFun);
+		src1.addSink(new DiscardingSink<Integer>());
+		assertEquals(srcFun, getFunctionFromDataSource(src1));
+
+		List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+		assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource);
+
+		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+		assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction);
+
+		DataStreamSource<Long> src4 = env.fromCollection(list);
+		assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
+	}
+
+	@Test
+	public void testDefaultParallelismIsDefault() {
+		assertEquals(
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				StreamExecutionEnvironment.createLocalEnvironment().getParallelism());
+
+		assertEquals(
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				StreamExecutionEnvironment.createRemoteEnvironment("dummy", 1234).getParallelism());
+
+		StreamExecutionEnvironment contextEnv = new StreamContextEnvironment(
+				new ContextEnvironment(
+						mock(ClusterClient.class),
+						Collections.<URL>emptyList(),
+						Collections.<URL>emptyList(),
+						this.getClass().getClassLoader(),
+						null));
+
+		assertEquals(
+				ExecutionConfig.PARALLELISM_DEFAULT,
+				contextEnv.getParallelism());
+	}
+
+	@Test
+	public void testParallelismBounds() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Integer> ctx) throws Exception {
+			}
+
+			@Override
+			public void cancel() {
+			}
+		};
+
+
+		SingleOutputStreamOperator<Object> operator =
+				env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(Integer value, Collector<Object> out) throws Exception {
+
+			}
+		});
+
+		// default value for max parallelism
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for parallelism 1
+		try {
+			operator.setParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for parallelism 2
+		operator.setParallelism(1);
+		Assert.assertEquals(1, operator.getParallelism());
+
+		// bounds for parallelism 3
+		operator.setParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getParallelism());
+
+		// default value after generating
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+		// configured value after generating
+		env.setMaxParallelism(42);
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
+
+		// bounds configured parallelism 1
+		try {
+			env.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds configured parallelism 2
+		try {
+			env.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 1
+		try {
+			operator.setMaxParallelism(0);
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 2
+		try {
+			operator.setMaxParallelism(1 + (1 << 15));
+			Assert.fail();
+		} catch (IllegalArgumentException expected) {
+		}
+
+		// bounds for max parallelism 3
+		operator.setMaxParallelism(1);
+		Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
+
+		// bounds for max parallelism 4
+		operator.setMaxParallelism(1 << 15);
+		Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
+
+		// override config
+		env.getStreamGraph().getJobGraph();
+		Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
+	}
+
+	/////////////////////////////////////////////////////////////
+	// Utilities
+	/////////////////////////////////////////////////////////////
+
+
+	private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) {
+		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+		StreamGraph streamGraph = env.getStreamGraph();
+		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+		dataStreamSource.addSink(new DiscardingSink<T>());
+		AbstractUdfStreamOperator<?, ?> operator =
+				(AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource);
+		return (SourceFunction<T>) operator.getUserFunction();
+	}
+
+	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
+		private static final long serialVersionUID = 1312752876092210499L;
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Iterator<T>[] split(int numPartitions) {
+			return (Iterator<T>[]) new Iterator<?>[0];
+		}
+
+		@Override
+		public int getMaximumNumberOfSplits() {
+			return 0;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return false;
+		}
+
+		@Override
+		public T next() {
+			throw new NoSuchElementException();
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	public static class ParentClass {
+		int num;
+		String string;
+		public ParentClass(int num, String string) {
+			this.num = num;
+			this.string = string;
+		}
+	}
+
+	public static class SubClass extends ParentClass{
+		public SubClass(int num, String string) {
+			super(num, string);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6d2fcaa..abf51ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -112,10 +112,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	@Test
 	public void testDisabledCheckpointing() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamGraph streamGraph = new StreamGraph(env);
+		StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */);
 		assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
+		StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
 		JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
 		JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
@@ -137,7 +137,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 				}
 			})
 			.print();
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
 
 		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		JobVertex sourceVertex = verticesSorted.get(0);
@@ -224,7 +224,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		});
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
 
 		JobVertex sourceMapFilterVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
 		JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
@@ -291,7 +291,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		}).disableChaining().name("test_sink");
 		sinkMethod.invoke(sink, resource5);
 
-		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph();
 
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			if (jobVertex.getName().contains("test_source")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 734879d..4b479f3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -123,7 +123,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -218,7 +218,7 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -246,6 +246,10 @@ public class FoldApplyProcessWindowFunctionTest {
 
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
+		public DummyStreamExecutionEnvironment() {
+			super(1);
+		}
+
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index fecd440..6ddca34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -117,7 +117,7 @@ public class FoldApplyWindowFunctionTest {
 
 		transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+		StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */);
 
 		List<Integer> result = new ArrayList<>();
 		List<Integer> input = new ArrayList<>();
@@ -140,6 +140,10 @@ public class FoldApplyWindowFunctionTest {
 
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
+		public DummyStreamExecutionEnvironment() {
+			super(1);
+		}
+
 		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 22f1264..60798e0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -673,23 +673,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
 object StreamExecutionEnvironment {
 
-  /**
-   * Sets the default parallelism that will be used for the local execution
-   * environment created by [[createLocalEnvironment()]].
-   *
-   * @param parallelism The default parallelism to use for local execution.
-   */
-  @PublicEvolving
-  def setDefaultLocalParallelism(parallelism: Int) : Unit =
-    JavaEnv.setDefaultLocalParallelism(parallelism)
-
-  /**
-   * Gets the default parallelism that will be used for the local execution environment created by
-   * [[createLocalEnvironment()]].
-   */
-  @PublicEvolving
-  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
-  
   // --------------------------------------------------------------------------
   //  context environment
   // --------------------------------------------------------------------------
@@ -711,13 +694,14 @@ object StreamExecutionEnvironment {
   /**
    * Creates a local execution environment. The local execution environment will run the
    * program in a multi-threaded fashion in the same JVM as the environment was created in.
-   *
-   * This method sets the environment's default parallelism to given parameter, which
-   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
+  def createLocalEnvironment(parallelism: Int = -1):
       StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
+    if (parallelism == -1) {
+      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment())
+    } else {
+      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 60c609d..08153be 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -255,9 +255,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    // default parallelism is only actualized when transforming to JobGraph
+    assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(-1 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     try {
       src.setParallelism(3)
@@ -272,9 +273,11 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     // the parallelism does not change since some windowing code takes the parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    // setting a parallelism on the env/in the ExecutionConfig means that operators
+    // pick it up when being instantiated
+    assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(7 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 64c68dc..90d8790 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -36,6 +36,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	
 
 	public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) {
+		super(parallelism);
 		this.executor = Preconditions.checkNotNull(executor);
 		setParallelism(parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9cfae899/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index c56fa91..883f4b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -381,6 +381,10 @@ public class AccumulatorLiveITCase {
 	 */
 	private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
+		public DummyStreamExecutionEnvironment() {
+			super(1 /* default parallelism */);
+		}
+
 		@Override
 		public JobExecutionResult execute() throws Exception {
 			return execute("default");