You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/05/15 14:13:59 UTC

flink git commit: [FLINK-1525] Implement a tool for parsing input parameters and provide facilities to use them in the system.

Repository: flink
Updated Branches:
  refs/heads/master a35c7201e -> b335f5879


[FLINK-1525] Implement a tool for parsing input parameters and provide facilities to use them in the system.

This closes #664


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b335f587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b335f587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b335f587

Branch: refs/heads/master
Commit: b335f5879f534a543d64aafca88e0d512be1b710
Parents: a35c720
Author: Robert Metzger <rm...@apache.org>
Authored: Thu May 7 22:49:33 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri May 15 14:12:55 2015 +0200

----------------------------------------------------------------------
 docs/_includes/navbar.html                      |   1 +
 docs/apis/best_practices.md                     | 157 +++++++++
 docs/apis/programming_guide.md                  |  67 +++-
 .../flink/api/common/ExecutionConfig.java       |  36 ++
 .../flink/configuration/Configuration.java      |  19 +-
 .../apache/flink/api/java/ClosureCleaner.java   |   6 +-
 .../flink/api/java/utils/ParameterTool.java     | 350 +++++++++++++++++++
 .../flink/api/java/utils/ParameterToolTest.java | 183 ++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |  19 +-
 .../jobmanager/web/JobManagerInfoServlet.java   |  26 ++
 .../resources/web-docs-infoserver/analyze.html  |  11 +-
 .../resources/web-docs-infoserver/history.html  |   2 +-
 .../web-docs-infoserver/js/analyzer.js          |  19 +
 .../flink/api/scala/ExecutionEnvironment.scala  |  15 +-
 14 files changed, 897 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index dd89d6c..6628e09 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -69,6 +69,7 @@ under the License.
 
                 <li class="divider"></li>
                 <li><a href="{{ apis }}/dataset_transformations.html">Dataset Transformations</a></li>
+                <li><a href="{{ apis }}/best_practices.html">Best Practices</a></li>
                 <li><a href="{{ apis }}/example_connectors.html">Connectors</a></li>
                 <li><a href="{{ apis }}/examples.html">Examples</a></li>
                 <li><a href="{{ apis }}/local_execution.html">Local Execution</a></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/docs/apis/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md
new file mode 100644
index 0000000..da8d264
--- /dev/null
+++ b/docs/apis/best_practices.md
@@ -0,0 +1,157 @@
+---
+title: "Best Practices"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<a href="#top"></a>
+
+
+This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+## Parsing command line arguments and passing them around in your Flink application
+
+
+Almost all Flink applications, both batch and streaming rely on external configuration parameters.
+For example for specifying input and output sources (like paths or addresses), also system parameters (parallelism, runtime configuration) and application specific parameters (often used within the user functions).
+
+Since version 0.9 we are providing a simple utility called `ParameterTool` to provide at least some basic tooling for solving these problems.
+
+Please note that you don't have to use the `ParameterTool` explained here. Other frameworks such as [Commons CLI](https://commons.apache.org/proper/commons-cli/), 
+[argparse4j](http://argparse4j.sourceforge.net/) and others work well with Flink as well.
+
+
+### Getting your configuration values into the `ParameterTool`
+
+The `ParameterTool` provides a set of predefined static methods for reading the configuration. The tool is internally expecting a `Map<String, String>`, so its very easy to integrate it with your own configuration style.
+
+
+#### From `.properties` files
+
+The following method will read a [Properties](https://docs.oracle.com/javase/tutorial/essential/environment/properties.html) file and provide the key/value pairs:
+{% highlight java %}
+String propertiesFile = "/home/sam/flink/myjob.properties";
+ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
+{% endhighlight %}
+
+
+#### From the command line arguments
+
+This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
+{% highlight java %}
+public static void main(String[] args) {
+	ParameterTool parameter = ParameterTool.fromArgs(args);
+	// .. regular code ..
+{% endhighlight %}
+
+
+#### From system properties
+
+When starting a JVM, you can pass system properties to it: `-Dinput=hdfs:///mydata`. You can also initialize the `ParameterTool` from these system properties:
+
+{% highlight java %}
+ParameterTool parameter = ParameterTool.fromSystemProperties();
+{% endhighlight %}
+
+
+### Using the parameters in your Flink program
+
+Now that we've got the parameters from somewhere (see above) we can use them in various ways.
+
+**Directly from the `ParameterTool`**
+
+The `ParameterTool` itself has methods for accessing the values.
+{% highlight java %}
+ParameterTool parameters = // ...
+parameter.getRequired("input");
+parameter.get("output", "myDefaultValue");
+parameter.getLong("expectedCount", -1L);
+parameter.getNumberOfParameters()
+// .. there are more methods available.
+{% endhighlight %}
+
+You can use the return values of these methods directly in the main() method (=the client submitting the application).
+For example you could set the parallelism of a operator like this:
+
+{% highlight java %}
+ParameterTool parameters = ParameterTool.fromArgs(args);
+int parallelism = parameters.get("mapParallelism", 2);
+DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
+{% endhighlight %}
+
+Since the `ParameterTool` is serializable, you can pass it to the functions itself:
+
+{% highlight java %}
+ParameterTool parameters = ParameterTool.fromArgs(args);
+DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
+{% endhighlight %}
+
+and then use them inside the function for getting values from the command line.
+
+
+#### Passing it as a `Configuration` object to single functions
+
+The example below shows how to pass the parameters as a `Configuration` object to a user defined function.
+
+{% highlight java %}
+ParameterTool parameters = ParameterTool.fromArgs(args);
+DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
+{% endhighlight %}
+
+In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
+
+{% highlight java %}
+public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		parameters.getInteger("myInt", -1);
+		// .. do 
+{% endhighlight %}
+
+
+#### Register the parameters globally
+
+Parameters registered as a [global job parameter](programming_guide.html#passing-parameters-to-functions) at the `ExecutionConfig` allow you to access the configuration values from the JobManager web interface and all functions defined by the user.
+
+**Register the parameters globally**
+
+{% highlight java %}
+ParameterTool parameters = ParameterTool.fromArgs(args);
+
+// set up the execution environment
+final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setGlobalJobParameters(parameters);
+{% endhighlight %}
+
+Access them in any rich user function:
+
+{% highlight java %}
+public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+
+	@Override
+	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+		parameters.getRequired("input");
+		// .. do more ..
+{% endhighlight %}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 46dcc28..14e5163 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -231,6 +231,7 @@ getExecutionEnvironment()
 
 createLocalEnvironment()
 createLocalEnvironment(int parallelism)
+createLocalEnvironment(Configuration customConfiguration)
 
 createRemoteEnvironment(String host, int port, String... jarFiles)
 createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles)
@@ -331,6 +332,7 @@ obtain one using these static methods on class `ExecutionEnvironment`:
 def getExecutionEnvironment
 
 def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()))
+def createLocalEnvironment(customConfiguration: Configuration)
 
 def createRemoteEnvironment(host: String, port: String, jarFiles: String*)
 def createRemoteEnvironment(host: String, port: String, parallelism: Int, jarFiles: String*)
@@ -665,6 +667,18 @@ DataSet<Integer> result = in.partitionByHash(0)
       </td>
     </tr>
     <tr>
+      <td><strong>Custom Partitioning</strong></td>
+      <td>
+        <p>Manually specify a partitioning over the data.
+          <br/>
+          <i>Note</i>: This method works only on single field keys.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
       <td><strong>Sort Partition</strong></td>
       <td>
         <p>Locally sorts all partitions of a data set on a specified field in a specified order. 
@@ -2588,8 +2602,9 @@ of a function, or use the `withParameters(...)` method to pass in a configuratio
 Passing Parameters to Functions
 -------------------
 
-Parameters can be passed to functions using either the constructor or the `withParameters(Configuration)` method. The parameters are serialized
-as part of the function object and shipped to all parallel task instances.
+Parameters can be passed to functions using either the constructor or the `withParameters(Configuration)` method. The parameters are serialized as part of the function object and shipped to all parallel task instances.
+
+Check also the [best practices guide on how to pass command line arguments to functions](best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application).
 
 #### Via Constructor
 
@@ -2680,6 +2695,54 @@ toFilter.filter(new RichFilterFunction[Int]() {
 </div>
 </div>
 
+#### Globally via the `ExecutionConfig`
+
+Flink also allows to pass custom configuration values to the `ExecutionConfig` interface of the environment. Since the execution config is accessible in all (rich) user functions, the custom configuration will be available globally in all functions.
+
+
+**Setting a custom global configuration**
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+Configuration conf = new Configuration();
+conf.setString("mykey","myvalue");
+final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setGlobalJobParameters(conf);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val conf = new Configuration()
+conf.setString("mykey", "myvalue")
+env.getConfig.setGlobalJobParameters(conf)
+{% endhighlight %}
+</div>
+</div>
+
+Please note that you can also pass a custom class extending the `ExecutionConfig.GlobalJobParameters` class as the global job parameters to the execution config. The interface allows to implement the `Map<String, String> toMap()` method which will in turn show the values from the configuration in the web frontend.
+
+
+**Accessing values from the global configuration**
+
+Objects in the global job parameters are accessible in many places in the system. All user functions implementing a `Rich*Function` interface have access through the runtime context.
+
+{% highlight java %}
+public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
+
+    private String mykey;
+    @Override
+    public void open(Configuration parameters) throws Exception {
+      super.open(parameters);
+      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+      Configuration globConf = (Configuration) globalParams;
+      mykey = globConf.getString("mykey", null);
+    }
+    // ... more here ...
+{% endhighlight %}
+
 [Back to top](#top)
 
 Program Packaging & Distributed Execution

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 8df6aab..51ffad7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -23,6 +23,7 @@ import com.esotericsoftware.kryo.Serializer;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A config to define the behavior of the program execution. It allows to define (among other
@@ -79,6 +80,8 @@ public class ExecutionConfig implements Serializable {
 	/** If set to true, progress updates are printed to System.out during execution */
 	private boolean printProgressDuringExecution = true;
 
+	private GlobalJobParameters globalJobParameters = null;
+
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in lists to ensure they are registered in order in all kryo instances.
 
@@ -342,6 +345,18 @@ public class ExecutionConfig implements Serializable {
 		return this.printProgressDuringExecution;
 	}
 
+	public GlobalJobParameters getGlobalJobParameters() {
+		return globalJobParameters;
+	}
+
+	/**
+	 * Register a custom, serializable user configuration object.
+	 * @param globalJobParameters Custom user configuration object
+	 */
+	public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) {
+		this.globalJobParameters = globalJobParameters;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Registry for types and serializers
 	// --------------------------------------------------------------------------------------------
@@ -529,6 +544,8 @@ public class ExecutionConfig implements Serializable {
 	}
 
 
+	// ------------------------------ Utilities  ----------------------------------
+
 	public static class Entry<K, V> implements Serializable {
 
 		private static final long serialVersionUID = 1L;
@@ -585,4 +602,23 @@ public class ExecutionConfig implements Serializable {
 					'}';
 		}
 	}
+
+	/**
+	 * Interface for custom user configuration object registered at the execution config.
+	 *
+	 * This user config is accessible at runtime through
+	 * getRuntimeContext().getExecutionConfig().getUserConfig()
+	 */
+	public static class GlobalJobParameters implements Serializable {
+		/**
+		 * Convert UserConfig into a Map<String, String> representation.
+		 * This can be used by the runtime, for example for presenting the user config in the web frontend.
+		 *
+		 * @return Key/Value representation of the UserConfig, or null.
+		 */
+		public Map<String, String> toMap() {
+			return null;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 62598a4..c095b5f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * Lightweight configuration object which can store key/value pairs.
  */
 @SuppressWarnings("EqualsBetweenInconvertibleTypes")
-public class Configuration implements IOReadableWritable, java.io.Serializable, Cloneable {
+public class Configuration extends ExecutionConfig.GlobalJobParameters implements IOReadableWritable, java.io.Serializable, Cloneable {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -454,7 +455,21 @@ public class Configuration implements IOReadableWritable, java.io.Serializable,
 			return this.confData.containsKey(key);
 		}
 	}
-	
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Map<String, String> toMap() {
+		synchronized (this.confData){
+			Map<String, String> ret = new HashMap<String, String>(this.confData.size());
+			for(Map.Entry<String, Object> entry : confData.entrySet()) {
+				ret.put(entry.getKey(), entry.getValue().toString());
+			}
+			return ret;
+		}
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	
 	private <T> void setValueInternal(String key, T value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index d8d4074..ec1ebd6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -90,11 +90,11 @@ public class ClosureCleaner {
 	}
 
 
-	public static void ensureSerializable(Object func) {
+	public static void ensureSerializable(Object obj) {
 		try {
-			InstantiationUtil.serializeObject(func);
+			InstantiationUtil.serializeObject(obj);
 		} catch (Exception e) {
-			throw new InvalidProgramException("Task " + func + " not serializable: ", e);
+			throw new InvalidProgramException("Object " + obj + " not serializable", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
new file mode 100644
index 0000000..d1c1f49
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
+	protected static String NO_VALUE_KEY = "__NO_VALUE_KEY";
+	protected static String DEFAULT_UNDEFINED = "<undefined>";
+
+
+	// ------------------ Constructors ------------------------
+
+	public static ParameterTool fromArgs(String[] args) {
+		Map<String, String> map = new HashMap<String, String>(args.length / 2);
+
+		String key = null;
+		String value = null;
+		boolean expectValue = false;
+		for(String arg: args) {
+			// check for -- argument
+			if(arg.startsWith("--")) {
+				if(expectValue) {
+					// we got into a new key, even though we were a value --> current key is one without value
+					if(value != null) {
+						throw new IllegalStateException("Unexpected state");
+					}
+					map.put(key, NO_VALUE_KEY);
+					// key will be overwritten in the next step
+				}
+				key = arg.substring(2);
+				expectValue = true;
+			} // check for - argument
+			else if(arg.startsWith("-")) {
+				// we are waiting for a value, so this is a - prefixed value (negative number)
+				if(expectValue) {
+
+					if(NumberUtils.isNumber(arg)) {
+						// negative number
+						value = arg;
+						expectValue = false;
+					} else {
+						if(value != null) {
+							throw new IllegalStateException("Unexpected state");
+						}
+						// We waited for a value but found a new key. So the previous key doesnt have a value.
+						map.put(key, NO_VALUE_KEY);
+						key = arg.substring(1);
+						expectValue = true;
+					}
+				} else {
+					// we are not waiting for a value, so its an argument
+					key = arg.substring(1);
+					expectValue = true;
+				}
+			} else {
+				if(expectValue) {
+					value = arg;
+					expectValue = false;
+				} else {
+					throw new RuntimeException("Error parsing arguments '"+ Arrays.toString(args)+"' on '"+arg+"'. Unexpected value. Please prefix values with -- or -.");
+				}
+			}
+
+			if(value == null && key == null) {
+				throw new IllegalStateException("Value and key can not be null at the same time");
+			}
+			if(key != null && value == null && !expectValue) {
+				throw new IllegalStateException("Value expected but flag not set");
+			}
+			if(key != null && value != null) {
+				map.put(key, value);
+				key = null;
+				value = null;
+				expectValue = false;
+			}
+			if(key != null && key.length() == 0) {
+				throw new IllegalArgumentException("The input "+Arrays.toString(args)+" contains an empty argument");
+			}
+
+			if(key != null && !expectValue) {
+				map.put(key, NO_VALUE_KEY);
+				key = null;
+				expectValue = false;
+			}
+		}
+		if(key != null) {
+			map.put(key, NO_VALUE_KEY);
+		}
+
+		return fromMap(map);
+	}
+
+	public static ParameterTool fromPropertiesFile(String path) throws IOException {
+		File propertiesFile = new File(path);
+		if(!propertiesFile.exists()) {
+			throw new FileNotFoundException("Properties file "+path+" does not exist");
+		}
+		Properties props = new Properties();
+		props.load(new FileInputStream(propertiesFile));
+		return fromMap((Map)props);
+	}
+
+	public static ParameterTool fromMap(Map<String, String> map) {
+		Preconditions.checkNotNull(map, "Unable to initialize from empty map");
+		return new ParameterTool(map);
+	}
+
+	public static ParameterTool fromSystemProperties() {
+		return fromMap((Map) System.getProperties());
+	}
+
+	// ------------------ ParameterUtil  ------------------------
+	protected final Map<String, String> data;
+	protected final HashMap<String, String> defaultData;
+
+	private ParameterTool(Map<String, String> data) {
+		this.data = new HashMap<String, String>(data);
+		this.defaultData = new HashMap<String, String>();
+	}
+
+	// ------------------ Get data from the util ----------------
+
+	public int getNumberOfParameters() {
+		return data.size();
+	}
+
+	public String get(String key) {
+		addToDefaults(key, null);
+		return data.get(key);
+	}
+
+	public String getRequired(String key) {
+		addToDefaults(key, null);
+		String value = get(key);
+		if(value == null) {
+			throw new RuntimeException("No data for required key '"+key+"'");
+		}
+		return value;
+	}
+
+
+	public String get(String key, String defaultValue) {
+		addToDefaults(key, defaultValue);
+		String value = get(key);
+		if(value == null) {
+			return defaultValue;
+		} else {
+			return value;
+		}
+	}
+
+	/**
+	 * Check if value is set
+	 */
+	public boolean has(String value) {
+		addToDefaults(value, null);
+		return data.containsKey(value);
+	}
+
+	// -------------- Integer
+
+	public int getInt(String key) {
+		addToDefaults(key, null);
+		String value = getRequired(key);
+		return Integer.valueOf(value);
+	}
+
+	public int getLong(String key, int defaultValue) {
+		addToDefaults(key, Integer.toString(defaultValue));
+		String value = get(key);
+		if(value == null) {
+			return defaultValue;
+		} else {
+			return Integer.valueOf(value);
+		}
+	}
+
+	// -------------- LONG
+	/**
+	 * Get long value.
+	 * The method fails if the key is not specified.
+	 * @param key Name of the key
+	 * @return
+	 */
+	public long getLong(String key) {
+		addToDefaults(key, null);
+		String value = getRequired(key);
+		return Long.valueOf(value);
+	}
+
+	public long getLong(String key, long defaultValue) {
+		addToDefaults(key, Long.toString(defaultValue));
+		String value = get(key);
+		if(value == null) {
+			return defaultValue;
+		} else {
+			return Long.valueOf(value);
+		}
+	}
+
+	// -------------- FLOAT
+
+	public float getFloat(String key) {
+		addToDefaults(key, null);
+		String value = getRequired(key);
+		return Float.valueOf(value);
+	}
+
+	public float getFloat(String key, float defaultValue) {
+		addToDefaults(key, Float.toString(defaultValue));
+		String value = get(key);
+		if(value == null) {
+			return defaultValue;
+		} else {
+			return Float.valueOf(value);
+		}
+	}
+
+	// -------------- DOUBLE
+
+	public double getDouble(String key) {
+		addToDefaults(key, null);
+		String value = getRequired(key);
+		return Double.valueOf(value);
+	}
+
+	public double getDouble(String key, double defaultValue) {
+		addToDefaults(key, Double.toString(defaultValue));
+		String value = get(key);
+		if(value == null) {
+			return defaultValue;
+		} else {
+			return Double.valueOf(value);
+		}
+	}
+
+	// --------------- Internals
+
+	protected void addToDefaults(String key, String value) {
+		String currentValue = defaultData.get(key);
+		if(currentValue == null) {
+			if(value == null) {
+				value = DEFAULT_UNDEFINED;
+			}
+			defaultData.put(key, value);
+		} else {
+			// there is already an entry for this key. Check if the value is the undefined
+			if(currentValue.equals(DEFAULT_UNDEFINED) && value != null) {
+				// update key with better default value
+				defaultData.put(key, value);
+			}
+		}
+	}
+
+	// ------------------------- Export to different targets -------------------------
+
+	public Configuration getConfiguration() {
+		Configuration conf = new Configuration();
+		for(Map.Entry<String, String> entry: data.entrySet()) {
+			conf.setString(entry.getKey(), entry.getValue());
+		}
+		return conf;
+	}
+
+	public Properties getProperties() {
+		Properties props = new Properties();
+		props.putAll(this.data);
+		return props;
+	}
+
+
+	/**
+	 * Create a properties file with all the known parameters (call after the last get*() call).
+	 * Set the default value, if available.
+	 *
+	 * Use this method to create a properties file skeleton.
+	 *
+	 * @param pathToFile Location of the default properties file.
+	 */
+	public void createPropertiesFile(String pathToFile) throws IOException {
+		createPropertiesFile(pathToFile, true);
+	}
+
+	public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException {
+		File file = new File(pathToFile);
+		if(file.exists()) {
+			if(overwrite) {
+				file.delete();
+			} else {
+				throw new RuntimeException("File "+pathToFile+" exists and overwriting is not allowed");
+			}
+		}
+		Properties defaultProps = new Properties();
+		defaultProps.putAll(this.defaultData);
+		defaultProps.store(new FileOutputStream(file), "Default file created by Flink's ParameterUtil.createPropertiesFile()");
+	}
+
+	@Override
+	protected Object clone() throws CloneNotSupportedException {
+		return new ParameterTool(this.data);
+	}
+
+
+
+	// ------------------------- Interaction with other ParameterUtils -------------------------
+
+	public ParameterTool mergeWith(ParameterTool other) {
+		ParameterTool ret = new ParameterTool(this.data);
+		ret.data.putAll(other.data);
+		return ret;
+	}
+
+	// ------------------------- ExecutionConfig.UserConfig interface -------------------------
+
+	@Override
+	public Map<String, String> toMap() {
+		return data;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
new file mode 100644
index 0000000..c660b7a
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+public class ParameterToolTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	// ----- Parser tests -----------------
+
+	@Test(expected = RuntimeException.class)
+	public void testIllegalArgs() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"berlin"});
+		Assert.assertEquals(0, parameter.getNumberOfParameters());
+	}
+
+	@Test
+	public void testNoVal() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-berlin"});
+		Assert.assertEquals(1, parameter.getNumberOfParameters());
+		Assert.assertTrue(parameter.has("berlin"));
+	}
+
+	@Test
+	public void testNoValDouble() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+		Assert.assertEquals(1, parameter.getNumberOfParameters());
+		Assert.assertTrue(parameter.has("berlin"));
+	}
+
+	@Test
+	public void testMultipleNoVal() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--a", "--b", "--c", "--d", "--e", "--f"});
+		Assert.assertEquals(6, parameter.getNumberOfParameters());
+		Assert.assertTrue(parameter.has("a"));
+		Assert.assertTrue(parameter.has("b"));
+		Assert.assertTrue(parameter.has("c"));
+		Assert.assertTrue(parameter.has("d"));
+		Assert.assertTrue(parameter.has("e"));
+		Assert.assertTrue(parameter.has("f"));
+	}
+
+	@Test
+	public void testMultipleNoValMixed() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--a", "-b", "-c", "-d", "--e", "--f"});
+		Assert.assertEquals(6, parameter.getNumberOfParameters());
+		Assert.assertTrue(parameter.has("a"));
+		Assert.assertTrue(parameter.has("b"));
+		Assert.assertTrue(parameter.has("c"));
+		Assert.assertTrue(parameter.has("d"));
+		Assert.assertTrue(parameter.has("e"));
+		Assert.assertTrue(parameter.has("f"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testEmptyVal() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--a", "-b", "--"});
+		Assert.assertEquals(2, parameter.getNumberOfParameters());
+		Assert.assertTrue(parameter.has("a"));
+		Assert.assertTrue(parameter.has("b"));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testEmptyValShort() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--a", "-b", "-"});
+		Assert.assertEquals(2, parameter.getNumberOfParameters());
+		Assert.assertTrue(parameter.has("a"));
+		Assert.assertTrue(parameter.has("b"));
+	}
+
+	@Test
+	public void testFromCliArgs() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--input", "myInput", "-expectedCount", "15", "--withoutValues", "--negativeFloat", "-0.58"});
+		Assert.assertEquals(4, parameter.getNumberOfParameters());
+		validate(parameter);
+		Assert.assertTrue(parameter.has("withoutValues"));
+		Assert.assertEquals(-0.58, parameter.getFloat("negativeFloat"), 0.1);
+	}
+
+	@Test
+	public void testFromPropertiesFile() throws IOException {
+		File propertiesFile = tmp.newFile();
+		Properties props = new Properties();
+		props.setProperty("input", "myInput");
+		props.setProperty("expectedCount", "15");
+		props.store(new FileOutputStream(propertiesFile), "Test properties");
+		ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile.getAbsolutePath());
+		Assert.assertEquals(2, parameter.getNumberOfParameters());
+		validate(parameter);
+	}
+
+	@Test
+	public void testFromMapOrProperties() {
+		Properties props = new Properties();
+		props.setProperty("input", "myInput");
+		props.setProperty("expectedCount", "15");
+		ParameterTool parameter = ParameterTool.fromMap((Map) props);
+		Assert.assertEquals(2, parameter.getNumberOfParameters());
+		validate(parameter);
+	}
+
+	/**
+	 * This is mainly meant to be used with -D arguments against the JVM.
+	 */
+	@Test
+	public void testSystemProperties() {
+		System.setProperty("input", "myInput");
+		System.setProperty("expectedCount", "15");
+		ParameterTool parameter = ParameterTool.fromSystemProperties();
+		validate(parameter);
+	}
+
+	@Test
+	public void testMerged() {
+		ParameterTool parameter1 = ParameterTool.fromArgs(new String[]{"--input", "myInput"});
+		System.setProperty("expectedCount", "15");
+		ParameterTool parameter2 = ParameterTool.fromSystemProperties();
+		ParameterTool parameter = parameter1.mergeWith(parameter2);
+		validate(parameter);
+	}
+
+	private void validate(ParameterTool parameter) {
+		ClosureCleaner.ensureSerializable(parameter);
+		Assert.assertEquals("myInput", parameter.getRequired("input"));
+		Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
+		Assert.assertEquals(null, parameter.get("whatever"));
+		Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L));
+
+		Configuration config = parameter.getConfiguration();
+		Assert.assertEquals(15L, config.getLong("expectedCount", -1L));
+
+		Properties props = parameter.getProperties();
+		Assert.assertEquals("myInput", props.getProperty("input"));
+		props = null;
+
+		// -------- test the default file creation ------------
+		try {
+			String pathToFile = tmp.newFile().getAbsolutePath();
+			parameter.createPropertiesFile(pathToFile);
+			Properties defaultProps = new Properties();
+			defaultProps.load(new FileInputStream(pathToFile));
+
+			Assert.assertEquals("myDefaultValue", defaultProps.get("output"));
+			Assert.assertEquals("-1", defaultProps.get("expectedCount"));
+			Assert.assertTrue(defaultProps.containsKey("input"));
+
+		} catch (IOException e) {
+			Assert.fail(e.getMessage());
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 90cf42e..a041f86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import akka.actor.ActorRef;
 
 import akka.actor.ActorSystem;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -196,8 +198,10 @@ public class ExecutionGraph implements Serializable {
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointCoordinator checkpointCoordinator;
-	
-	
+
+	// ------ Fields that are only relevant for archived execution graphs ------------
+	private ExecutionConfig executionConfig = null;
+
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -617,7 +621,12 @@ public class ExecutionGraph implements Serializable {
 		if (!state.isTerminalState()) {
 			throw new IllegalStateException("Can only archive the job from a terminal state");
 		}
-
+		// "unpack" execution config before we throw away the usercode classloader.
+		try {
+			executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(jobConfiguration, ExecutionConfig.CONFIG_KEY,userClassLoader);
+		} catch (Exception e) {
+			LOG.warn("Error deserializing the execution config while archiving the execution graph", e);
+		}
 		// clear the non-serializable fields
 		userClassLoader = null;
 		scheduler = null;
@@ -634,6 +643,10 @@ public class ExecutionGraph implements Serializable {
 		executionListenerActors.clear();
 	}
 
+	public ExecutionConfig getExecutionConfig() {
+		return this.executionConfig;
+	}
+
 	/**
 	 * For testing: This waits until the job execution has finished.
 	 * @throws InterruptedException

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 144c477..a407d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -35,6 +36,7 @@ import akka.actor.ActorRef;
 
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchivedJobs;
@@ -398,6 +400,30 @@ public class JobManagerInfoServlet extends HttpServlet {
 			}
 			wrt.write("],");
 
+			// write user config
+			ExecutionConfig ec = graph.getExecutionConfig();
+			if(ec != null) {
+				wrt.write("\"executionConfig\": {");
+				wrt.write("\"Execution Mode\": \""+ec.getExecutionMode()+"\",");
+				wrt.write("\"Number of execution retries\": \""+ec.getNumberOfExecutionRetries()+"\",");
+				wrt.write("\"Job parallelism\": \""+ec.getParallelism()+"\",");
+				wrt.write("\"Object reuse mode\": \""+ec.isObjectReuseEnabled()+"\"");
+				ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
+				Map<String, String> ucVals = uc.toMap();
+				if(ucVals != null) {
+					String ucString = "{";
+					int i = 0;
+					for (Map.Entry<String, String> ucVal: ucVals.entrySet()) {
+						ucString += "\""+ucVal.getKey()+"\":\""+ucVal.getValue()+"\"";
+						if (++i < ucVals.size()) {
+							ucString += ",\n";
+						}
+					}
+					wrt.write(", \"userConfig\": "+ucString+"}");
+				}
+				wrt.write("},");
+			}
+
 			// write accumulators
 			final Future<Object> response = Patterns.ask(jobmanager,
 					new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html b/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
index 2f72bd6..81e2436 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/analyze.html
@@ -57,7 +57,7 @@ under the License.
 		$.ajax({ url : "menu?get=analyze", type : "GET", cache: false, success : function(html) {
 			$("#side-menu").empty();
 			$("#side-menu").append(html);
-		}, dataType : "html",
+		}, dataType : "html"
 		});
 	});
   	</script>
@@ -145,6 +145,15 @@ under the License.
 		   </div>
 		</div>
 
+       <div class="panel panel-primary">
+         <div class="panel-heading">
+           <div class="panel-title">Job Configuration</div>
+         </div>
+         <div id="config" class="panel-body">
+           Unable to retrieve job configuration
+         </div>
+       </div>
+
     </div><!-- /#page-wrapper -->
 
     </div><!-- /#wrapper -->

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-runtime/src/main/resources/web-docs-infoserver/history.html
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/history.html b/flink-runtime/src/main/resources/web-docs-infoserver/history.html
index 1aae230..d3ee224 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/history.html
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/history.html
@@ -51,7 +51,7 @@ under the License.
 		$.ajax({ url : "menu?get=history", type : "GET", cache: false, success : function(html) {
 			$("#side-menu").empty();
 			$("#side-menu").append(html);
-		}, dataType : "html",
+		}, dataType : "html"
 		});
 	});
   	</script>

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
index f93db1c..3934017 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/analyzer.js
@@ -100,6 +100,25 @@ function analyzeTime(json, stacked) {
 			$("#accumulators").html(accuTable);
 		}
 
+		// create config table
+		var configTable = "<div class=\"table-responsive\">" +
+			"<table class=\"table table-bordered table-hover table-striped\">" +
+			"<tr><td><b>Key</b></td><td><b>Value</b></td></tr>";
+		$.each(job.executionConfig, function(key, value) {
+			if(key == "userConfig") {
+				return;
+			}
+			configTable += "<tr><td>"+key+"</td><td>"+value+"</td></tr>";
+		});
+		if(job.executionConfig.hasOwnProperty("userConfig")) {
+			configTable += "<tr><td colspan=\"2\"><b>User Configuration</b></td></tr>";
+			$.each(job.executionConfig.userConfig, function(key, value) {
+				configTable += "<tr><td>"+key+"</td><td>"+value+"</td></tr>";
+			});
+		}
+		configTable += "</table></div>";
+		$("#config").html(configTable);
+
 		var data = new google.visualization.DataTable();
 		data.addColumn('datetime', 'start');
 		data.addColumn('datetime', 'end');

http://git-wip-us.apache.org/repos/asf/flink/blob/b335f587/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 7073f07..3f80833 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.scala.hadoop.mapred
 import org.apache.flink.api.scala.hadoop.mapreduce
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
 import org.apache.flink.core.fs.Path
+import org.apache.flink.configuration.Configuration;
 
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment}
 import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
@@ -635,8 +636,8 @@ object ExecutionEnvironment {
 
   /**
    * Creates a local execution environment. The local execution environment will run the program in
-   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
-   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
+   * a multi-threaded fashion in the same JVM as the environment was created in. The parallelism of
+   * the local environment is the number of hardware contexts (CPU cores/threads).
    */
   def createLocalEnvironment(
       parallelism: Int = Runtime.getRuntime.availableProcessors())
@@ -647,6 +648,16 @@ object ExecutionEnvironment {
   }
 
   /**
+   * Creates a local execution environment. The local execution environment will run the program in
+   * a multi-threaded fashion in the same JVM as the environment was created in.
+   * This method allows to pass a custom Configuration to the local environment.
+   */
+  def createLocalEnvironment(customConfiguration: Configuration): ExecutionEnvironment = {
+    val javaEnv = JavaEnv.createLocalEnvironment(customConfiguration)
+    new ExecutionEnvironment(javaEnv)
+  }
+
+  /**
    * Creates an execution environment that uses Java Collections underneath. This will execute in a
    * single thread in the current JVM. It is very fast but will fail if the data does not fit into
    * memory. This is useful during implementation and for debugging.