You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/13 16:06:15 UTC

[2/5] incubator-flink git commit: [streaming] StreamExecutionEnvironment rework + user class loader fix for cluster deployment

[streaming] StreamExecutionEnvironment rework + user class loader fix for cluster deployment


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6867f9b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6867f9b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6867f9b9

Branch: refs/heads/master
Commit: 6867f9b93ec1ad9a627450c4fbd0b5ff98ef6148
Parents: c6dd9b1
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 12 01:11:36 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Nov 13 15:24:04 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/JobGraphBuilder.java    | 18 -----
 .../flink/streaming/api/StreamConfig.java       | 73 +++++++++++--------
 .../api/environment/LocalStreamEnvironment.java | 17 ++---
 .../environment/RemoteStreamEnvironment.java    | 40 ++++++-----
 .../environment/StreamContextEnvironment.java   | 75 ++++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java | 69 +++++-------------
 .../api/streamvertex/CoStreamVertex.java        |  9 ++-
 .../api/streamvertex/InputHandler.java          |  7 +-
 .../api/streamvertex/OutputHandler.java         |  8 ++-
 .../api/streamvertex/StreamVertex.java          |  9 ++-
 .../flink/streaming/util/ClusterUtil.java       | 40 +----------
 .../api/streamvertex/StreamVertexTest.java      |  5 --
 .../examples/iteration/IterateExample.java      |  4 +-
 .../client/program/ContextEnvironment.java      |  4 ++
 .../flink/client/program/JobWithJars.java       |  2 +-
 15 files changed, 193 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 5a8fd22..df59be1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -80,8 +80,6 @@ public class JobGraphBuilder {
 	private Map<String, Long> iterationWaitTime;
 	private Map<String, Map<String, OperatorState<?>>> operatorStates;
 
-	private int degreeOfParallelism;
-	private int executionParallelism;
 
 	/**
 	 * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG
@@ -120,22 +118,6 @@ public class JobGraphBuilder {
 		}
 	}
 
-	public int getDefaultParallelism() {
-		return degreeOfParallelism;
-	}
-
-	public void setDefaultParallelism(int defaultParallelism) {
-		this.degreeOfParallelism = defaultParallelism;
-	}
-
-	public int getExecutionParallelism() {
-		return executionParallelism;
-	}
-
-	public void setExecutionParallelism(int executionParallelism) {
-		this.executionParallelism = executionParallelism;
-	}
-
 	/**
 	 * Adds a vertex to the streaming JobGraph with the given parameters
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7da6265..3dba376 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.util.InstantiationUtil;
 
 public class StreamConfig {
 	private static final String INPUT_TYPE = "inputType_";
@@ -98,20 +99,20 @@ public class StreamConfig {
 		setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoIn1() {
-		return getTypeInfo(TYPE_WRAPPER_IN_1);
+	public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoIn2() {
-		return getTypeInfo(TYPE_WRAPPER_IN_2);
+	public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoOut1() {
-		return getTypeInfo(TYPE_WRAPPER_OUT_1);
+	public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
 	}
 
-	public <T> TypeInformation<T> getTypeInfoOut2() {
-		return getTypeInfo(TYPE_WRAPPER_OUT_2);
+	public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
+		return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
 	}
 
 	private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
@@ -119,18 +120,17 @@ public class StreamConfig {
 	}
 
 	@SuppressWarnings("unchecked")
-	private <T> TypeInformation<T> getTypeInfo(String key) {
-		byte[] serializedWrapper = config.getBytes(key, null);
+	private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
 
-		if (serializedWrapper == null) {
-			throw new RuntimeException("TypeSerializationWrapper must be set");
+		TypeWrapper<T> typeWrapper;
+		try {
+			typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
+					cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot load typeinfo");
 		}
-
-		TypeWrapper<T> typeWrapper = (TypeWrapper<T>) SerializationUtils
-				.deserialize(serializedWrapper);
 		if (typeWrapper != null) {
 			return typeWrapper.getTypeInfo();
-
 		} else {
 			return null;
 		}
@@ -166,9 +166,10 @@ public class StreamConfig {
 		}
 	}
 
-	public <T> T getUserInvokable() {
+	@SuppressWarnings({ "unchecked" })
+	public <T> T getUserInvokable(ClassLoader cl) {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(SERIALIZEDUDF, null));
+			return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot instantiate user function", e);
 		}
@@ -189,10 +190,10 @@ public class StreamConfig {
 		}
 	}
 
-	public Object getFunction() {
+	public Object getFunction(ClassLoader cl) {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
-		} catch (SerializationException e) {
+			return InstantiationUtil.readObjectFromConfig(this.config, FUNCTION, cl);
+		} catch (Exception e) {
 			throw new RuntimeException("Cannot deserialize invokable object", e);
 		}
 	}
@@ -216,9 +217,11 @@ public class StreamConfig {
 		}
 	}
 
-	public <T> OutputSelector<T> getOutputSelector() {
+	@SuppressWarnings("unchecked")
+	public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) {
 		try {
-			return SerializationUtils.deserialize(config.getBytes(OUTPUT_SELECTOR, null));
+			return (OutputSelector<T>) InstantiationUtil.readObjectFromConfig(this.config,
+					OUTPUT_SELECTOR, cl);
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
 		}
@@ -254,10 +257,16 @@ public class StreamConfig {
 				SerializationUtils.serialize(partitionerObject));
 	}
 
-	public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
-			IOException {
-		return SerializationUtils.deserialize(config.getBytes(PARTITIONER_OBJECT + outputIndex,
-				SerializationUtils.serialize(new ShufflePartitioner<T>())));
+	public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int outputIndex)
+			throws ClassNotFoundException, IOException {
+		@SuppressWarnings("unchecked")
+		StreamPartitioner<T> partitioner = (StreamPartitioner<T>) InstantiationUtil
+				.readObjectFromConfig(this.config, PARTITIONER_OBJECT + outputIndex, cl);
+		if (partitioner != null) {
+			return partitioner;
+		} else {
+			return new ShufflePartitioner<T>();
+		}
 	}
 
 	public void setSelectAll(int outputIndex, Boolean selectAll) {
@@ -323,8 +332,14 @@ public class StreamConfig {
 		config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states));
 	}
 
-	public Map<String, OperatorState<?>> getOperatorStates() {
-		return SerializationUtils.deserialize(config.getBytes(OPERATOR_STATES, null));
+	@SuppressWarnings("unchecked")
+	public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
+		try {
+			return (Map<String, OperatorState<?>>) InstantiationUtil.readObjectFromConfig(
+					this.config, OPERATOR_STATES, cl);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not load operator state");
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 5c0f555..505f0e7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -21,13 +21,15 @@ import org.apache.flink.streaming.util.ClusterUtil;
 
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
+	protected static ClassLoader userClassLoader;
+
 	/**
 	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
 	 * default name.
 	 */
 	@Override
 	public void execute() throws Exception {
-		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism());
 	}
 
 	/**
@@ -39,19 +41,12 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public void execute(String jobName) throws Exception {
-		if (localExecutionIsAllowed()) {
-			ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
-					getExecutionParallelism());
-		} else {
-			ClusterUtil.runOnLocalCluster(this.jobGraphBuilder.getJobGraph(jobName),
-					getExecutionParallelism());
-		}
-
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
+				getDegreeOfParallelism());
 	}
 
 	public void executeTest(long memorySize) throws Exception {
-		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
 				memorySize);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 864e18d..d833c8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -20,9 +20,8 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -30,13 +29,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
 	private String host;
 	private int port;
-	private String[] jarFiles;
+	private List<File> jarFiles;
 
 	/**
 	 * Creates a new RemoteStreamEnvironment that points to the master
@@ -65,19 +66,28 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 		this.host = host;
 		this.port = port;
-		this.jarFiles = jarFiles;
+		this.jarFiles = new ArrayList<File>();
+		for (int i = 0; i < jarFiles.length; i++) {
+			File file = new File(jarFiles[i]);
+			try {
+				JobWithJars.checkJarFile(file);
+			} catch (IOException e) {
+				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
+			}
+			this.jarFiles.add(file);
+		}
 	}
 
 	@Override
 	public void execute() {
-		
+
 		JobGraph jobGraph = jobGraphBuilder.getJobGraph();
 		executeRemotely(jobGraph);
 	}
-	
+
 	@Override
 	public void execute(String jobName) {
-		
+
 		JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
 		executeRemotely(jobGraph);
 	}
@@ -85,25 +95,21 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	/**
 	 * Executes the remote job.
 	 * 
-	 * @param jobGraph jobGraph to execute
+	 * @param jobGraph
+	 *            jobGraph to execute
 	 */
 	private void executeRemotely(JobGraph jobGraph) {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running remotely at {}:{}", host, port);
 		}
 
-		for (int i = 0; i < jarFiles.length; i++) {
-			File file = new File(jarFiles[i]);
-			try {
-				JobWithJars.checkJarFile(file);
-			} catch (IOException e) {
-				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
-			}
+		for (File file : jarFiles) {
 			jobGraph.addJar(new Path(file.getAbsolutePath()));
 		}
 
 		Configuration configuration = jobGraph.getJobConfiguration();
-		Client client = new Client(new InetSocketAddress(host, port), configuration, getClass().getClassLoader());
+		Client client = new Client(new InetSocketAddress(host, port), configuration,
+				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()));
 
 		try {
 			client.run(jobGraph, true);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
new file mode 100644
index 0000000..c157435
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class StreamContextEnvironment extends StreamExecutionEnvironment {
+
+	protected static ClassLoader userClassLoader;
+	protected List<File> jars;
+	protected Client client;
+
+	protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+		this.client = client;
+		this.jars = jars;
+		if (dop > 0) {
+			setDegreeOfParallelism(dop);
+		} else {
+			setDegreeOfParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+		}
+	}
+
+	@Override
+	public void execute() throws Exception {
+		execute(null);
+	}
+
+	@Override
+	public void execute(String jobName) throws Exception {
+
+		JobGraph jobGraph;
+		if (jobName == null) {
+			jobGraph = this.jobGraphBuilder.getJobGraph();
+		} else {
+			jobGraph = this.jobGraphBuilder.getJobGraph(jobName);
+		}
+
+		for (File file : jars) {
+			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		}
+
+		try {
+			client.run(jobGraph, true);
+
+		} catch (Exception e) {
+			throw e;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 0d00db3..600a87a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -20,10 +20,13 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -44,21 +47,10 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
  */
 public abstract class StreamExecutionEnvironment {
 
-	/**
-	 * The environment of the context (local by default, cluster if invoked
-	 * through command line)
-	 */
-	private static StreamExecutionEnvironment contextEnvironment;
-
-	/** flag to disable local executor when using the ContextEnvironment */
-	private static boolean allowLocalExecution = true;
-
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
 
 	private int degreeOfParallelism = 1;
 
-	private int executionParallelism = -1;
-
 	private long bufferTimeout = 100;
 
 	protected JobGraphBuilder jobGraphBuilder;
@@ -74,10 +66,6 @@ public abstract class StreamExecutionEnvironment {
 		jobGraphBuilder = new JobGraphBuilder();
 	}
 
-	public int getExecutionParallelism() {
-		return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
-	}
-
 	/**
 	 * Gets the degree of parallelism with which operation are executed by
 	 * default. Operations can individually override this value to use a
@@ -143,21 +131,6 @@ public abstract class StreamExecutionEnvironment {
 		return this.bufferTimeout;
 	}
 
-	/**
-	 * Sets the number of hardware contexts (CPU cores / threads) used when
-	 * executed in {@link LocalStreamEnvironment}.
-	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism in local environment
-	 */
-	public void setExecutionParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
-		}
-
-		this.executionParallelism = degreeOfParallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	// Data stream creations
 	// --------------------------------------------------------------------------------------------
@@ -351,8 +324,19 @@ public abstract class StreamExecutionEnvironment {
 	 *         executed.
 	 */
 	public static StreamExecutionEnvironment getExecutionEnvironment() {
-		allowLocalExecution = ExecutionEnvironment.localExecutionIsAllowed();
-		return createLocalEnvironment();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (env instanceof ContextEnvironment) {
+			ContextEnvironment ctx = (ContextEnvironment) env;
+			return createContextEnvironment(ctx.getClient(), ctx.getJars(),
+					ctx.getDegreeOfParallelism());
+		} else {
+			return createLocalEnvironment();
+		}
+	}
+
+	private static StreamExecutionEnvironment createContextEnvironment(Client client,
+			List<File> jars, int dop) {
+		return new StreamContextEnvironment(client, jars, dop);
 	}
 
 	/**
@@ -440,27 +424,6 @@ public abstract class StreamExecutionEnvironment {
 		return rec;
 	}
 
-	// --------------------------------------------------------------------------------------------
-	// Methods to control the context and local environments for execution from
-	// packaged programs
-	// --------------------------------------------------------------------------------------------
-
-	protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
-		contextEnvironment = ctx;
-	}
-
-	protected static boolean isContextEnvironmentSet() {
-		return contextEnvironment != null;
-	}
-
-	protected static void disableLocalExecution() {
-		allowLocalExecution = false;
-	}
-
-	public static boolean localExecutionIsAllowed() {
-		return allowLocalExecution;
-	}
-
 	/**
 	 * Triggers the program execution. The environment will execute all parts of
 	 * the program that have resulted in a "sink" operation. Sink operations are

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 5a6519d..2464ff2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -29,8 +29,7 @@ import org.apache.flink.streaming.io.CoReaderIterator;
 import org.apache.flink.streaming.io.CoRecordReader;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class CoStreamVertex<IN1, IN2, OUT> extends
-		StreamVertex<IN1,OUT> {
+public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
 	private OutputHandler<OUT> outputHandler;
 
@@ -53,10 +52,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
 	}
 
 	private void setDeserializers() {
-		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
+		TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
 		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
 
-		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
+		TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
 		inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
 	}
 
@@ -72,7 +71,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends
 
 	@Override
 	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
+		userInvokable = configuration.getUserInvokable(userClassLoader);
 		userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
 				inputDeserializer2, isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 17d2ae5..9d65a21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -35,10 +35,10 @@ public class InputHandler<IN> {
 	private MutableObjectIterator<StreamRecord<IN>> inputIter;
 	private MutableReader<IOReadableWritable> inputs;
 
-	private StreamVertex<IN,?> streamVertex;
+	private StreamVertex<IN, ?> streamVertex;
 	private StreamConfig configuration;
 
-	public InputHandler(StreamVertex<IN,?> streamComponent) {
+	public InputHandler(StreamVertex<IN, ?> streamComponent) {
 		this.streamVertex = streamComponent;
 		this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
 		try {
@@ -75,7 +75,8 @@ public class InputHandler<IN> {
 	}
 
 	private void setDeserializer() {
-		TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
+		TypeInformation<IN> inTupleTypeInfo = configuration
+				.getTypeInfoIn1(streamVertex.userClassLoader);
 		if (inTupleTypeInfo != null) {
 			inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 8b72195..d8eb146 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -81,7 +81,8 @@ public class OutputHandler<OUT> {
 
 	private StreamCollector<OUT> setCollector() {
 		if (streamVertex.configuration.getDirectedEmit()) {
-			OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
+			OutputSelector<OUT> outputSelector = streamVertex.configuration
+					.getOutputSelector(streamVertex.userClassLoader);
 
 			collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
 					outSerializationDelegate, outputSelector);
@@ -97,7 +98,7 @@ public class OutputHandler<OUT> {
 	}
 
 	void setSerializers() {
-		outTypeInfo = configuration.getTypeInfoOut1();
+		outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
 		if (outTypeInfo != null) {
 			outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
 			outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
@@ -110,7 +111,8 @@ public class OutputHandler<OUT> {
 		StreamPartitioner<OUT> outputPartitioner = null;
 
 		try {
-			outputPartitioner = configuration.getPartitioner(outputNumber);
+			outputPartitioner = configuration.getPartitioner(streamVertex.userClassLoader,
+					outputNumber);
 
 		} catch (Exception e) {
 			throw new StreamVertexException("Cannot deserialize partitioner for "

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index a8ec98f..2db0d8b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -44,6 +44,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	private StreamingRuntimeContext context;
 	private Map<String, OperatorState<?>> states;
 
+	protected ClassLoader userClassLoader;
+
 	public StreamVertex() {
 		userInvokable = null;
 		numTasks = newVertex();
@@ -63,12 +65,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	}
 
 	protected void initialize() {
+		this.userClassLoader = getUserCodeClassLoader();
 		this.configuration = new StreamConfig(getTaskConfiguration());
 		this.name = configuration.getVertexName();
 		this.isMutable = configuration.getMutability();
 		this.functionName = configuration.getFunctionName();
-		this.function = configuration.getFunction();
-		this.states = configuration.getOperatorStates();
+		this.function = configuration.getFunction(userClassLoader);
+		this.states = configuration.getOperatorStates(userClassLoader);
 		this.context = createRuntimeContext(name, this.states);
 	}
 
@@ -85,7 +88,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
 	}
 
 	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable();
+		userInvokable = configuration.getUserInvokable(userClassLoader);
 		userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
 				inputHandler.getInputSerializer(), isMutable);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 278cb5a..ebe383d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,10 +19,8 @@ package org.apache.flink.streaming.util;
 
 import java.net.InetSocketAddress;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,44 +70,12 @@ public class ClusterUtil {
 		} catch (Exception e) {
 			throw e;
 		} finally {
-			try {
-				exec.stop();
-			} catch (Throwable t) {
-			}
+			exec.stop();
 		}
 	}
 
-	public static void runOnLocalCluster(JobGraph jobGraph, int degreeOfPrallelism)
-			throws Exception {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running on mini cluster");
-		}
-
-		try {
-
-			Client client = ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
-					.getClient();
-
-			client.run(jobGraph, true);
-		} catch (ProgramInvocationException e) {
-			if (e.getMessage().contains("GraphConversionException")) {
-				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
-			} else {
-				throw e;
-			}
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			try {
-			} catch (Throwable t) {
-			}
-		}
-	}
-
-	public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers)
-			throws Exception {
-		runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
+	public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
+		runOnMiniCluster(jobGraph, numOfSlots, -1);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index e01809d..765de9c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -112,11 +112,6 @@ public class StreamVertexTest {
 		}
 
 		try {
-			env.setExecutionParallelism(-10);
-			fail();
-		} catch (IllegalArgumentException e) {
-		}
-		try {
 			env.generateSequence(1, 10).project(2);
 			fail();
 		} catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 82d81f4..e2094fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -73,13 +73,13 @@ public class IterateExample {
 			input.add(new Tuple2<Double, Integer>(0., 0));
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2)
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 				.setBufferTimeout(1);
 
 		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).iterate()
 				.setMaxWaitTime(3000);
 		
-		SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().setParallelism(2).split(new MySelector());
+		SplitDataStream<Tuple2<Double,Integer>> step = it.map(new Step()).shuffle().split(new MySelector());
 		
 		it.closeWith(step.select("iterate"));
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 4f91514..89b301a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -83,4 +83,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public Client getClient() {
 		return this.client;
 	}
+	
+	public List<File> getJars(){
+		return jarFilesToAttach;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6867f9b9/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b8151da..b86487f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -102,7 +102,7 @@ public class JobWithJars {
 		// TODO: Check if proper JAR file
 	}
 	
-	static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
+	public static ClassLoader buildUserCodeClassLoader(List<File> jars, ClassLoader parent) {
 		
 		URL[] urls = new URL[jars.size()];
 		try {