You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/18 12:02:47 UTC
incubator-flink git commit: [FLINK-1207] Context environments are
realized through factories - local execution blocking is reset after each run
Repository: incubator-flink
Updated Branches:
refs/heads/master ae07abe37 -> 83d02563e
[FLINK-1207] Context environments are realized through factories
- local execution blocking is reset after each run
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/83d02563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/83d02563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/83d02563
Branch: refs/heads/master
Commit: 83d02563ea4a1c7d05540849bf3bf033d968b021
Parents: ae07abe
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 17 19:04:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 17 22:07:48 2014 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/program/Client.java | 44 ++++++++++-------
.../client/program/ContextEnvironment.java | 52 +++++++++++++++++---
.../flink/client/program/PackagedProgram.java | 14 +++++-
.../flink/client/web/JobSubmissionServlet.java | 18 +++++--
.../ExecutionPlanAfterExecutionTest.java | 23 ++++++++-
.../client/program/PackagedProgramTest.java | 20 +++++++-
.../flink/api/java/ExecutionEnvironment.java | 16 +++---
.../api/java/ExecutionEnvironmentFactory.java | 32 ++++++++++++
.../flink/test/util/JavaProgramTestBase.java | 19 ++++++-
9 files changed, 195 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 6243c96..c29fddf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client.program;
import java.io.ByteArrayOutputStream;
@@ -26,7 +25,6 @@ import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.JobExecutionResult;
@@ -52,6 +50,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import com.google.common.base.Preconditions;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -156,7 +155,7 @@ public class Client {
ByteArrayOutputStream baes = new ByteArrayOutputStream();
System.setErr(new PrintStream(baes));
try {
- ContextEnvironment.disableLocalExecution();
+ ContextEnvironment.enableLocalExecution(false);
prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
@@ -169,7 +168,9 @@ public class Client {
} else {
throw new ProgramInvocationException("The program caused an error: ", t);
}
- } finally {
+ }
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
System.setOut(originalOut);
System.setErr(originalErr);
System.err.println(baes);
@@ -177,9 +178,9 @@ public class Client {
}
throw new ProgramInvocationException(
- "The program plan could not be fetched - the program aborted pre-maturely. <br/><br/>"
- + "System.err: "+StringEscapeUtils.escapeHtml4(baes.toString())+" <br/>"
- + "System.out: "+StringEscapeUtils.escapeHtml4(baos.toString())+" <br/>" );
+ "The program plan could not be fetched - the program aborted pre-maturely.\n"
+ + "System.err: " + baes.toString() + '\n'
+ + "System.out: " + baos.toString() + '\n');
}
else {
throw new RuntimeException();
@@ -230,18 +231,17 @@ public class Client {
return run(prog.getPlanWithJars(), parallelism, wait);
}
else if (prog.isUsingInteractiveMode()) {
- ContextEnvironment env = new ContextEnvironment(this, prog.getAllLibraries(), prog.getUserCodeClassLoader());
-
- if (parallelism > 0) {
- env.setDegreeOfParallelism(parallelism);
- }
- env.setAsContext();
-
- ContextEnvironment.disableLocalExecution();
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism);
+ ContextEnvironment.enableLocalExecution(false);
if (wait) {
// invoke here
- prog.invokeInteractiveModeForExecution();
+ try {
+ prog.invokeInteractiveModeForExecution();
+ }
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ }
}
else {
// invoke in the background
@@ -253,6 +253,9 @@ public class Client {
catch (Throwable t) {
LOG.error("The program execution failed.", t);
}
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ }
}
};
backGroundRunner.start();
@@ -360,7 +363,14 @@ public class Client {
}
private void setAsContext() {
- initializeContextEnvironment(this);
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return OptimizerPlanEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 89b301a..891ec1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
@@ -71,20 +72,55 @@ public class ContextEnvironment extends ExecutionEnvironment {
+ ") : " + getIdString();
}
+ public Client getClient() {
+ return this.client;
+ }
- public void setAsContext() {
- initializeContextEnvironment(this);
+ public List<File> getJars(){
+ return jarFilesToAttach;
}
- public static void disableLocalExecution() {
- ExecutionEnvironment.disableLocalExecution();
+ // --------------------------------------------------------------------------------------------
+
+ static void setAsContext(Client client, List<File> jarFilesToAttach,
+ ClassLoader userCodeClassLoader, int defaultParallelism)
+ {
+ initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism));
}
- public Client getClient() {
- return this.client;
+ protected static void enableLocalExecution(boolean enabled) {
+ ExecutionEnvironment.enableLocalExecution(enabled);
}
- public List<File> getJars(){
- return jarFilesToAttach;
+ // --------------------------------------------------------------------------------------------
+
+ public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
+
+ private final Client client;
+
+ private final List<File> jarFilesToAttach;
+
+ private final ClassLoader userCodeClassLoader;
+
+ private final int defaultParallelism;
+
+
+ public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach,
+ ClassLoader userCodeClassLoader, int defaultParallelism)
+ {
+ this.client = client;
+ this.jarFilesToAttach = jarFilesToAttach;
+ this.userCodeClassLoader = userCodeClassLoader;
+ this.defaultParallelism = defaultParallelism;
+ }
+
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader);
+ if (defaultParallelism > 0) {
+ env.setDegreeOfParallelism(defaultParallelism);
+ }
+ return env;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 4d54f6b..b70a83c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.dag.DataSinkNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
@@ -261,7 +262,7 @@ public class PackagedProgram {
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
- ContextEnvironment.disableLocalExecution();
+ ContextEnvironment.enableLocalExecution(false);
invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
@@ -275,6 +276,9 @@ public class PackagedProgram {
throw new ProgramInvocationException("The program caused an error: ", t);
}
}
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ }
if (env.previewPlan != null) {
previewPlan = env.previewPlan;
@@ -705,7 +709,13 @@ public class PackagedProgram {
}
public void setAsContext() {
- initializeContextEnvironment(this);
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return PreviewPlanEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
}
public Plan getPlan() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 9cc6fad..84cb011 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -35,6 +35,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -183,9 +184,12 @@ public class JobSubmissionServlet extends HttpServlet {
e.getCause().printStackTrace(w);
}
+ String message = sw.toString();
+ message = StringEscapeUtils.escapeHtml4(message);
+
showErrorPage(resp, "An error occurred while invoking the program:<br/><br/>"
+ e.getMessage() + "<br/>"
- + "<br/><br/><pre>" + sw.toString() + "</pre>");
+ + "<br/><br/><pre>" + message + "</pre>");
return;
}
catch (CompilerException cex) {
@@ -193,11 +197,14 @@ public class JobSubmissionServlet extends HttpServlet {
StringWriter sw = new StringWriter();
PrintWriter w = new PrintWriter(sw);
cex.printStackTrace(w);
+
+ String message = sw.toString();
+ message = StringEscapeUtils.escapeHtml4(message);
showErrorPage(resp, "An error occurred in the compiler:<br/><br/>"
+ cex.getMessage() + "<br/>"
- + (cex.getCause()!= null?"Caused by: " + cex.getCause().getMessage():"")
- + "<br/><br/><pre>" + sw.toString() + "</pre>");
+ + (cex.getCause() != null ? "Caused by: " + cex.getCause().getMessage():"")
+ + "<br/><br/><pre>" + message + "</pre>");
return;
}
catch (Throwable t) {
@@ -206,8 +213,11 @@ public class JobSubmissionServlet extends HttpServlet {
PrintWriter w = new PrintWriter(sw);
t.printStackTrace(w);
+ String message = sw.toString();
+ message = StringEscapeUtils.escapeHtml4(message);
+
showErrorPage(resp, "An unexpected error occurred:<br/><br/>" + t.getMessage() + "<br/><br/><pre>"
- + sw.toString() + "</pre>");
+ + message + "</pre>");
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 95b7de3..2dd5148 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.DiscardingOuputFormat;
import org.junit.Test;
@@ -31,7 +32,8 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
@Test
public void testExecuteAfterGetExecutionPlan() {
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ ExecutionEnvironment env = new LocalEnvironment();
+
DataSet<Integer> baseSet = env.fromElements(1, 2);
DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
@@ -46,4 +48,23 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
fail("Cannot run both #getExecutionPlan and #execute.");
}
}
+
+ @Test
+ public void testCreatePlanAfterGetExecutionPlan() {
+ ExecutionEnvironment env = new LocalEnvironment();
+
+ DataSet<Integer> baseSet = env.fromElements(1, 2);
+
+ DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
+ @Override public Integer map(Integer value) throws Exception { return value * 2; }
+ });
+ result.output(new DiscardingOuputFormat<Integer>());
+
+ try {
+ env.getExecutionPlan();
+ env.createProgramPlan();
+ } catch (Exception e) {
+ fail("Cannot run both #getExecutionPlan and #execute.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 07a2669..1a9f17f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.client.program;
import java.io.File;
+import java.io.PrintStream;
import org.apache.flink.client.CliFrontendTestUtils;
import org.apache.flink.client.program.PackagedProgram;
@@ -32,7 +33,19 @@ public class PackagedProgramTest {
public void testGetPreviewPlan() {
try {
PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
- Assert.assertNotNull(prog.getPreviewPlan());
+
+ final PrintStream out = System.out;
+ final PrintStream err = System.err;
+ try {
+ System.setOut(new PrintStream(new NullOutputStream()));
+ System.setErr(new PrintStream(new NullOutputStream()));
+
+ Assert.assertNotNull(prog.getPreviewPlan());
+ }
+ finally {
+ System.setOut(out);
+ System.setErr(err);
+ }
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -40,4 +53,9 @@ public class PackagedProgramTest {
Assert.fail("Test is erroneous: " + e.getMessage());
}
}
+
+ private static final class NullOutputStream extends java.io.OutputStream {
+ @Override
+ public void write(int b) {}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index f549a93..2f9f661 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -78,7 +78,7 @@ import org.apache.flink.util.SplittableIterator;
public abstract class ExecutionEnvironment {
/** The environment of the context (local by default, cluster if invoked through command line) */
- private static ExecutionEnvironment contextEnvironment;
+ private static ExecutionEnvironmentFactory contextEnvironmentFactory;
/** The default parallelism used by local environments */
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
@@ -756,7 +756,8 @@ public abstract class ExecutionEnvironment {
* @return The execution environment of the context in which the program is executed.
*/
public static ExecutionEnvironment getExecutionEnvironment() {
- return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
+ return contextEnvironmentFactory == null ?
+ createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment();
}
/**
@@ -835,20 +836,19 @@ public abstract class ExecutionEnvironment {
// Methods to control the context and local environments for execution from packaged programs
// --------------------------------------------------------------------------------------------
- protected static void initializeContextEnvironment(ExecutionEnvironment ctx) {
- contextEnvironment = ctx;
+ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
+ contextEnvironmentFactory = ctx;
}
protected static boolean isContextEnvironmentSet() {
- return contextEnvironment != null;
+ return contextEnvironmentFactory != null;
}
- protected static void disableLocalExecution() {
- allowLocalExecution = false;
+ protected static void enableLocalExecution(boolean enabled) {
+ allowLocalExecution = enabled;
}
public static boolean localExecutionIsAllowed() {
return allowLocalExecution;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
new file mode 100644
index 0000000..5472887
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java;
+
+/**
+ * Factory class for execution environments.
+ */
+public interface ExecutionEnvironmentFactory {
+
+ /**
+ * Creates an ExecutionEnvironment from this factory.
+ *
+ * @return An ExecutionEnvironment.
+ */
+ ExecutionEnvironment createExecutionEnvironment();
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/83d02563/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index f75697b..0e9fefa 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -35,6 +35,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.tuple.Tuple;
@@ -243,7 +244,14 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
private void setAsContext() {
- initializeContextEnvironment(this);
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return TestEnvironment.this;
+ }
+ };
+
+ initializeContextEnvironment(factory);
}
}
@@ -264,7 +272,14 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
private void setAsContext() {
- initializeContextEnvironment(this);
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return CollectionTestEnvironment.this;
+ }
+ };
+
+ initializeContextEnvironment(factory);
}
}