You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/18 12:02:47 UTC

incubator-flink git commit: [FLINK-1207] Context environments are realized through factories - local execution blocking is reset after each run

Repository: incubator-flink
Updated Branches:
  refs/heads/master ae07abe37 -> 83d02563e


[FLINK-1207] Context environments are realized through factories
  - local execution blocking is reset after each run


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/83d02563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/83d02563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/83d02563

Branch: refs/heads/master
Commit: 83d02563ea4a1c7d05540849bf3bf033d968b021
Parents: ae07abe
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 17 19:04:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 17 22:07:48 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java | 44 ++++++++++-------
 .../client/program/ContextEnvironment.java      | 52 +++++++++++++++++---
 .../flink/client/program/PackagedProgram.java   | 14 +++++-
 .../flink/client/web/JobSubmissionServlet.java  | 18 +++++--
 .../ExecutionPlanAfterExecutionTest.java        | 23 ++++++++-
 .../client/program/PackagedProgramTest.java     | 20 +++++++-
 .../flink/api/java/ExecutionEnvironment.java    | 16 +++---
 .../api/java/ExecutionEnvironmentFactory.java   | 32 ++++++++++++
 .../flink/test/util/JavaProgramTestBase.java    | 19 ++++++-
 9 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 6243c96..c29fddf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.program;
 
 import java.io.ByteArrayOutputStream;
@@ -26,7 +25,6 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -52,6 +50,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -156,7 +155,7 @@ public class Client {
 			ByteArrayOutputStream baes = new ByteArrayOutputStream();
 			System.setErr(new PrintStream(baes));
 			try {
-				ContextEnvironment.disableLocalExecution();
+				ContextEnvironment.enableLocalExecution(false);
 				prog.invokeInteractiveModeForExecution();
 			}
 			catch (ProgramInvocationException e) {
@@ -169,7 +168,9 @@ public class Client {
 				} else {
 					throw new ProgramInvocationException("The program caused an error: ", t);
 				}
-			} finally {
+			}
+			finally {
+				ContextEnvironment.enableLocalExecution(true);
 				System.setOut(originalOut);
 				System.setErr(originalErr);
 				System.err.println(baes);
@@ -177,9 +178,9 @@ public class Client {
 			}
 			
 			throw new ProgramInvocationException(
-					"The program plan could not be fetched - the program aborted pre-maturely. <br/><br/>"
-					+ "System.err: "+StringEscapeUtils.escapeHtml4(baes.toString())+" <br/>"
-					+ "System.out: "+StringEscapeUtils.escapeHtml4(baos.toString())+" <br/>" );
+					"The program plan could not be fetched - the program aborted pre-maturely.\n"
+					+ "System.err: " + baes.toString() + '\n'
+					+ "System.out: " + baos.toString() + '\n');
 		}
 		else {
 			throw new RuntimeException();
@@ -230,18 +231,17 @@ public class Client {
 			return run(prog.getPlanWithJars(), parallelism, wait);
 		}
 		else if (prog.isUsingInteractiveMode()) {
-			ContextEnvironment env = new ContextEnvironment(this, prog.getAllLibraries(), prog.getUserCodeClassLoader());
-			
-			if (parallelism > 0) {
-				env.setDegreeOfParallelism(parallelism);
-			}
-			env.setAsContext();
-			
-			ContextEnvironment.disableLocalExecution();
 			
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism);
+			ContextEnvironment.enableLocalExecution(false);
 			if (wait) {
 				// invoke here
-				prog.invokeInteractiveModeForExecution();
+				try {
+					prog.invokeInteractiveModeForExecution();
+				}
+				finally {
+					ContextEnvironment.enableLocalExecution(true);
+				}
 			}
 			else {
 				// invoke in the background
@@ -253,6 +253,9 @@ public class Client {
 						catch (Throwable t) {
 							LOG.error("The program execution failed.", t);
 						}
+						finally {
+							ContextEnvironment.enableLocalExecution(true);
+						}
 					}
 				};
 				backGroundRunner.start();
@@ -360,7 +363,14 @@ public class Client {
 		}
 		
 		private void setAsContext() {
-			initializeContextEnvironment(this);
+			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+				
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					return OptimizerPlanEnvironment.this;
+				}
+			};
+			initializeContextEnvironment(factory);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 89b301a..891ec1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 
@@ -71,20 +72,55 @@ public class ContextEnvironment extends ExecutionEnvironment {
 				+ ") : " + getIdString();
 	}
 	
+	public Client getClient() {
+		return this.client;
+	}
 	
-	public void setAsContext() {
-		initializeContextEnvironment(this);
+	public List<File> getJars(){
+		return jarFilesToAttach;
 	}
 	
-	public static void disableLocalExecution() {
-		ExecutionEnvironment.disableLocalExecution();
+	// --------------------------------------------------------------------------------------------
+	
+	static void setAsContext(Client client, List<File> jarFilesToAttach, 
+				ClassLoader userCodeClassLoader, int defaultParallelism)
+	{
+		initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism));
 	}
 	
-	public Client getClient() {
-		return this.client;
+	protected static void enableLocalExecution(boolean enabled) {
+		ExecutionEnvironment.enableLocalExecution(enabled);
 	}
 	
-	public List<File> getJars(){
-		return jarFilesToAttach;
+	// --------------------------------------------------------------------------------------------
+	
+	public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
+		
+		private final Client client;
+		
+		private final List<File> jarFilesToAttach;
+		
+		private final ClassLoader userCodeClassLoader;
+		
+		private final int defaultParallelism;
+		
+
+		public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach, 
+				ClassLoader userCodeClassLoader, int defaultParallelism)
+		{
+			this.client = client;
+			this.jarFilesToAttach = jarFilesToAttach;
+			this.userCodeClassLoader = userCodeClassLoader;
+			this.defaultParallelism = defaultParallelism;
+		}
+		
+		@Override
+		public ExecutionEnvironment createExecutionEnvironment() {
+			ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader);
+			if (defaultParallelism > 0) {
+				env.setDegreeOfParallelism(defaultParallelism);
+			}
+			return env;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 4d54f6b..b70a83c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.dag.DataSinkNode;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
@@ -261,7 +262,7 @@ public class PackagedProgram {
 			PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 			env.setAsContext();
 			try {
-				ContextEnvironment.disableLocalExecution();
+				ContextEnvironment.enableLocalExecution(false);
 				invokeInteractiveModeForExecution();
 			}
 			catch (ProgramInvocationException e) {
@@ -275,6 +276,9 @@ public class PackagedProgram {
 					throw new ProgramInvocationException("The program caused an error: ", t);
 				}
 			}
+			finally {
+				ContextEnvironment.enableLocalExecution(true);
+			}
 			
 			if (env.previewPlan != null) {
 				previewPlan =  env.previewPlan;
@@ -705,7 +709,13 @@ public class PackagedProgram {
 		}
 		
 		public void setAsContext() {
-			initializeContextEnvironment(this);
+			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					return PreviewPlanEnvironment.this;
+				}
+			};
+			initializeContextEnvironment(factory);
 		}
 
 		public Plan getPlan() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 9cc6fad..84cb011 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -35,6 +35,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -183,9 +184,12 @@ public class JobSubmissionServlet extends HttpServlet {
 					e.getCause().printStackTrace(w);
 				}
 
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+				
 				showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
 					+ e.getMessage() + "<br/>"
-					+ "<br/><br/><pre>" + sw.toString() + "</pre>");
+					+ "<br/><br/><pre>" + message + "</pre>");
 				return;
 			}
 			catch (CompilerException cex) {
@@ -193,11 +197,14 @@ public class JobSubmissionServlet extends HttpServlet {
 				StringWriter sw = new StringWriter();
 				PrintWriter w = new PrintWriter(sw);
 				cex.printStackTrace(w);
+				
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
 
 				showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
 					+ cex.getMessage() + "<br/>"
-					+ (cex.getCause()!= null?"Caused by: " + cex.getCause().getMessage():"")
-					+ "<br/><br/><pre>" + sw.toString() + "</pre>");
+					+ (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
+					+ "<br/><br/><pre>" + message + "</pre>");
 				return;
 			}
 			catch (Throwable t) {
@@ -206,8 +213,11 @@ public class JobSubmissionServlet extends HttpServlet {
 				PrintWriter w = new PrintWriter(sw);
 				t.printStackTrace(w);
 
+				String message = sw.toString();
+				message = StringEscapeUtils.escapeHtml4(message);
+				
 				showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
-					+ sw.toString() + "</pre>");
+					+ message + "</pre>");
 				return;
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 95b7de3..2dd5148 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.DiscardingOuputFormat;
 import org.junit.Test;
 
@@ -31,7 +32,8 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
 
 	@Test
 	public void testExecuteAfterGetExecutionPlan() {
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		ExecutionEnvironment env = new LocalEnvironment();
+		
 		DataSet<Integer> baseSet = env.fromElements(1, 2);
 
 		DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
@@ -46,4 +48,23 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
 			fail("Cannot run both #getExecutionPlan and #execute.");
 		}
 	}
+	
+	@Test
+	public void testCreatePlanAfterGetExecutionPlan() {
+		ExecutionEnvironment env = new LocalEnvironment();
+		
+		DataSet<Integer> baseSet = env.fromElements(1, 2);
+
+		DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
+			@Override public Integer map(Integer value) throws Exception { return value * 2; }
+		});
+		result.output(new DiscardingOuputFormat<Integer>());
+
+		try {
+			env.getExecutionPlan();
+			env.createProgramPlan();
+		} catch (Exception e) {
+			fail("Cannot run both #getExecutionPlan and #execute.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 07a2669..1a9f17f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.program;
 
 import java.io.File;
+import java.io.PrintStream;
 
 import org.apache.flink.client.CliFrontendTestUtils;
 import org.apache.flink.client.program.PackagedProgram;
@@ -32,7 +33,19 @@ public class PackagedProgramTest {
 	public void testGetPreviewPlan() {
 		try {
 			PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
-			Assert.assertNotNull(prog.getPreviewPlan());
+			
+			final PrintStream out = System.out;
+			final PrintStream err = System.err;
+			try {
+				System.setOut(new PrintStream(new NullOutputStream()));
+				System.setErr(new PrintStream(new NullOutputStream()));
+				
+				Assert.assertNotNull(prog.getPreviewPlan());
+			}
+			finally {
+				System.setOut(out);
+				System.setErr(err);
+			}
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());
@@ -40,4 +53,9 @@ public class PackagedProgramTest {
 			Assert.fail("Test is erroneous: " + e.getMessage());
 		}
 	}
+	
+	private static final class NullOutputStream extends java.io.OutputStream {
+		@Override
+		public void write(int b) {}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index f549a93..2f9f661 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -78,7 +78,7 @@ import org.apache.flink.util.SplittableIterator;
 public abstract class ExecutionEnvironment {
 	
 	/** The environment of the context (local by default, cluster if invoked through command line) */
-	private static ExecutionEnvironment contextEnvironment;
+	private static ExecutionEnvironmentFactory contextEnvironmentFactory;
 	
 	/** The default parallelism used by local environments */
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
@@ -756,7 +756,8 @@ public abstract class ExecutionEnvironment {
 	 * @return The execution environment of the context in which the program is executed.
 	 */
 	public static ExecutionEnvironment getExecutionEnvironment() {
-		return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
+		return contextEnvironmentFactory == null ? 
+				createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment();
 	}
 	
 	/**
@@ -835,20 +836,19 @@ public abstract class ExecutionEnvironment {
 	//  Methods to control the context and local environments for execution from packaged programs
 	// --------------------------------------------------------------------------------------------
 	
-	protected static void initializeContextEnvironment(ExecutionEnvironment ctx) {
-		contextEnvironment = ctx;
+	protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
+		contextEnvironmentFactory = ctx;
 	}
 	
 	protected static boolean isContextEnvironmentSet() {
-		return contextEnvironment != null;
+		return contextEnvironmentFactory != null;
 	}
 	
-	protected static void disableLocalExecution() {
-		allowLocalExecution = false;
+	protected static void enableLocalExecution(boolean enabled) {
+		allowLocalExecution = enabled;
 	}
 	
 	public static boolean localExecutionIsAllowed() {
 		return allowLocalExecution;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
new file mode 100644
index 0000000..5472887
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+/**
+ * Factory class for execution environments.
+ */
+public interface ExecutionEnvironmentFactory {
+	
+	/**
+	 * Creates an ExecutionEnvironment from this factory.
+	 * 
+	 * @return An ExecutionEnvironment.
+	 */
+	ExecutionEnvironment createExecutionEnvironment();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index f75697b..0e9fefa 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -35,6 +35,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 
 
@@ -243,7 +244,14 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 		}
 		
 		private void setAsContext() {
-			initializeContextEnvironment(this);
+			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					return TestEnvironment.this;
+				}
+			};
+			
+			initializeContextEnvironment(factory);
 		}
 	}
 	
@@ -264,7 +272,14 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 		}
 		
 		private void setAsContext() {
-			initializeContextEnvironment(this);
+			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					return CollectionTestEnvironment.this;
+				}
+			};
+			
+			initializeContextEnvironment(factory);
 		}
 	}