You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/22 21:57:17 UTC
[1/4] flink git commit: [FLINK-2097] temporarily disable session
management API
Repository: flink
Updated Branches:
refs/heads/master 7984acc6b -> d58caa8ec
[FLINK-2097] temporarily disable session management API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d58caa8e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d58caa8e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d58caa8e
Branch: refs/heads/master
Commit: d58caa8ec88c348bef540cb5959d80bcf9bd894a
Parents: 71bf2f5
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 15 15:21:20 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/java/ExecutionEnvironment.java | 11 +++++++----
.../java/org/apache/flink/api/java/LocalEnvironment.java | 6 ++++--
.../org/apache/flink/api/java/RemoteEnvironment.java | 6 ++++--
3 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d58caa8e/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 23b5a57..0f61d88 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -277,10 +277,13 @@ public abstract class ExecutionEnvironment {
* @param timeout The timeout, in seconds.
*/
public void setSessionTimeout(long timeout) {
- if (timeout < 0) {
- throw new IllegalArgumentException("The session timeout must not be less than zero.");
- }
- this.sessionTimeout = timeout;
+ throw new IllegalStateException("Support for sessions is currently disabled. " +
+ "It will be enabled in future Flink versions.");
+ // Session management is disabled, revert this commit to enable
+ //if (timeout < 0) {
+ // throw new IllegalArgumentException("The session timeout must not be less than zero.");
+ //}
+ //this.sessionTimeout = timeout;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/d58caa8e/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 5fd272b..7c85ed9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -78,8 +78,10 @@ public class LocalEnvironment extends ExecutionEnvironment {
}
Plan p = createProgramPlan(jobName);
- p.setJobId(jobID);
- p.setSessionTimeout(sessionTimeout);
+
+ // Session management is disabled, revert this commit to enable
+ //p.setJobId(jobID);
+ //p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/d58caa8e/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 6ae1f26..63f59d3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -86,8 +86,10 @@ public class RemoteEnvironment extends ExecutionEnvironment {
ensureExecutorCreated();
Plan p = createProgramPlan(jobName);
- p.setJobId(jobID);
- p.setSessionTimeout(sessionTimeout);
+
+ // Session management is disabled, revert this commit to enable
+ //p.setJobId(jobID);
+ //p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
[3/4] flink git commit: [FLINK-2097][core] implement a job session
management
Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
new file mode 100644
index 0000000..c5ced37
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+
+import java.util.List;
+
+/**
+ * Environment to extract the pre-optimized plan.
+ */
+public final class PreviewPlanEnvironment extends ExecutionEnvironment {
+
+ List<DataSinkNode> previewPlan;
+
+ String preview;
+
+ Plan plan;
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ this.plan = createProgramPlan(jobName);
+ this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
+
+ // do not go on with anything now!
+ throw new OptimizerPlanEnvironment.ProgramAbortException();
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ Plan plan = createProgramPlan("unused");
+ this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
+
+ // do not go on with anything now!
+ throw new OptimizerPlanEnvironment.ProgramAbortException();
+ }
+
+ @Override
+ public void startNewSession() throws Exception {
+ }
+
+ public void setAsContext() {
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return PreviewPlanEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
+ }
+
+ public void setPreview(String preview) {
+ this.preview = preview;
+ }
+
+ public Plan getPlan() {
+ return plan;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index e43d7cc..f83b97c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -54,9 +54,6 @@ import org.slf4j.LoggerFactory;
public class JobSubmissionServlet extends HttpServlet {
- /**
- * Serial UID for serialization interoperability.
- */
private static final long serialVersionUID = 8447312301029847397L;
// ------------------------------------------------------------------------
@@ -96,7 +93,7 @@ public class JobSubmissionServlet extends HttpServlet {
private final Random rand; // random number generator for UID
private final CliFrontend cli;
-
+
public JobSubmissionServlet(CliFrontend cli, File jobDir, File planDir) {
@@ -296,10 +293,11 @@ public class JobSubmissionServlet extends HttpServlet {
return;
}
- Long uid = null;
+ Long uid;
try {
uid = Long.parseLong(id);
- } catch (NumberFormatException nfex) {
+ }
+ catch (NumberFormatException nfex) {
showErrorPage(resp, "An invalid id for the job was provided.");
return;
}
@@ -314,8 +312,8 @@ public class JobSubmissionServlet extends HttpServlet {
// submit the job
try {
- Client client = new Client(GlobalConfiguration.getConfiguration(), job.f0.getUserCodeClassLoader());
- client.run(client.getJobGraph(job.f0, job.f1), false);
+ Client client = new Client(GlobalConfiguration.getConfiguration());
+ client.runDetached(Client.getJobGraph(job.f0, job.f1), job.f0.getUserCodeClassLoader());
}
catch (Exception ex) {
LOG.error("Error submitting job to the job-manager.", ex);
@@ -329,7 +327,8 @@ public class JobSubmissionServlet extends HttpServlet {
// redirect to the start page
resp.sendRedirect(START_PAGE_URL);
- } else if (action.equals(ACTION_BACK_VALUE)) {
+ }
+ else if (action.equals(ACTION_BACK_VALUE)) {
// remove the job from the map
String id = req.getParameter("id");
@@ -337,10 +336,11 @@ public class JobSubmissionServlet extends HttpServlet {
return;
}
- Long uid = null;
+ Long uid;
try {
uid = Long.parseLong(id);
- } catch (NumberFormatException nfex) {
+ }
+ catch (NumberFormatException nfex) {
showErrorPage(resp, "An invalid id for the job was provided.");
return;
}
@@ -350,9 +350,9 @@ public class JobSubmissionServlet extends HttpServlet {
// redirect to the start page
resp.sendRedirect(START_PAGE_URL);
- } else {
+ }
+ else {
showErrorPage(resp, "Invalid action specified.");
- return;
}
}
@@ -428,7 +428,7 @@ public class JobSubmissionServlet extends HttpServlet {
* The string to be split.
* @return The array of split strings.
*/
- private static final List<String> tokenizeArguments(String args) {
+ private static List<String> tokenizeArguments(String args) {
List<String> list = new ArrayList<String>();
StringBuilder curr = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 751783c..c1df541 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -18,28 +18,19 @@
package org.apache.flink.client;
-import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.configuration.Configuration;
-import org.junit.BeforeClass;
import org.junit.Test;
-import java.net.InetAddress;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
import static org.junit.Assert.*;
public class CliFrontendInfoTest {
- @BeforeClass
- public static void init() {
- CliFrontendTestUtils.pipeSystemOutToNull();
- CliFrontendTestUtils.clearGlobalConfiguration();
- }
-
+ private static PrintStream stdOut;
+ private static PrintStream capture;
+ private static ByteArrayOutputStream buffer;
+
@Test
public void testErrorCases() {
try {
@@ -67,71 +58,49 @@ public class CliFrontendInfoTest {
@Test
public void testShowExecutionPlan() {
+ replaceStdOut();
try {
+
String[] parameters = new String[] { CliFrontendTestUtils.getTestJarPath() };
- InfoTestCliFrontend testFrontend = new InfoTestCliFrontend(-1);
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
int retCode = testFrontend.info(parameters);
assertTrue(retCode == 0);
+ assertTrue(buffer.toString().contains("\"parallelism\": \"1\""));
}
catch (Exception e) {
e.printStackTrace();
fail("Program caused an exception: " + e.getMessage());
+ } finally {
+ restoreStdOut();
}
}
@Test
public void testShowExecutionPlanWithParallelism() {
+ replaceStdOut();
try {
String[] parameters = {"-p", "17", CliFrontendTestUtils.getTestJarPath()};
- InfoTestCliFrontend testFrontend = new InfoTestCliFrontend(17);
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
int retCode = testFrontend.info(parameters);
assertTrue(retCode == 0);
+ assertTrue(buffer.toString().contains("\"parallelism\": \"17\""));
}
catch (Exception e) {
e.printStackTrace();
fail("Program caused an exception: " + e.getMessage());
+ } finally {
+ restoreStdOut();
}
}
-
- // --------------------------------------------------------------------------------------------
-
- private static final class InfoTestCliFrontend extends CliFrontend {
-
- private final int expectedDop;
-
- public InfoTestCliFrontend(int expectedDop) throws Exception {
- super(CliFrontendTestUtils.getConfigDir());
- this.expectedDop = expectedDop;
- }
-
- @Override
- protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName, int par)
- throws Exception {
- Configuration config = new Configuration();
- config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getLocalHost().getHostName());
- config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6176);
-
- return new TestClient(config, expectedDop);
- }
+ private static void replaceStdOut() {
+ stdOut = System.out;
+ buffer = new ByteArrayOutputStream();
+ capture = new PrintStream(buffer);
+ System.setOut(capture);
}
-
- private static final class TestClient extends Client {
-
- private final int expectedDop;
-
- private TestClient(Configuration config, int expectedDop) throws Exception {
- super(config, CliFrontendInfoTest.class.getClassLoader(), -1);
-
- this.expectedDop = expectedDop;
- }
-
- @Override
- public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism)
- throws CompilerException, ProgramInvocationException
- {
- assertEquals(this.expectedDop, parallelism);
- return "";
- }
+
+ private static void restoreStdOut() {
+ System.setOut(stdOut);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index c9ce12b..1718ba5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -38,6 +38,9 @@ import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -294,11 +297,10 @@ public class CliFrontendPackageProgramTest {
assertArrayEquals(progArgs, prog.getArguments());
Configuration c = new Configuration();
- c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
- Client cli = new Client(c, getClass().getClassLoader());
-
+ Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);
+
// we expect this to fail with a "ClassNotFoundException"
- cli.getOptimizedPlanAsJson(prog, 666);
+ Client.getOptimizedPlanAsJson(compiler, prog, 666);
fail("Should have failed with a ClassNotFoundException");
}
catch (ProgramInvocationException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index a7944ce..f910312 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -22,10 +22,12 @@ package org.apache.flink.client;
import static org.apache.flink.client.CliFrontendTestUtils.*;
import static org.junit.Assert.*;
+import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
public class CliFrontendRunTest {
@@ -44,16 +46,16 @@ public class CliFrontendRunTest {
String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"};
CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
int retCode = testFrontend.run(parameters);
- assertTrue(retCode != 0);
+ assertNotEquals(0, retCode);
}
-
+
// test without parallelism
{
String[] parameters = {"-v", getTestJarPath()};
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true);
assertEquals(0, testFrontend.run(parameters));
}
-
+
// test configure parallelism
{
String[] parameters = {"-v", "-p", "42", getTestJarPath()};
@@ -72,14 +74,14 @@ public class CliFrontendRunTest {
{
String[] parameters = {"-v", "-p", "text", getTestJarPath()};
CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- assertTrue(0 != testFrontend.run(parameters));
+ assertNotEquals(0, testFrontend.run(parameters));
}
-
+
// test configure parallelism with overflow integer value
{
String[] parameters = {"-v", "-p", "475871387138", getTestJarPath()};
CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- assertTrue(0 != testFrontend.run(parameters));
+ assertNotEquals(0, testFrontend.run(parameters));
}
}
catch (Exception e) {
@@ -87,7 +89,7 @@ public class CliFrontendRunTest {
fail(e.getMessage());
}
}
-
+
// --------------------------------------------------------------------------------------------
public static final class RunTestingCliFrontend extends CliFrontend {
@@ -102,10 +104,20 @@ public class CliFrontendRunTest {
}
@Override
- protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
+ protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
assertEquals(this.expectedParallelism, parallelism);
- assertEquals(client.getPrintStatusDuringExecution(), sysoutLogging);
+ assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
+ return 0;
+ }
+
+ @Override
+ protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
return 0;
}
+
+ @Override
+ protected Client getClient(CommandLineOptions options, String programName, int userParallelism) throws Exception {
+ return Mockito.mock(Client.class);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 7f67567..be3fe89 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.junit.Test;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -50,7 +50,7 @@ public class RemoteExecutorHostnameResolutionTest {
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
- catch (ProgramInvocationException e) {
+ catch (IOException e) {
// that is what we want!
assertTrue(e.getCause() instanceof UnknownHostException);
}
@@ -72,7 +72,7 @@ public class RemoteExecutorHostnameResolutionTest {
exec.executePlan(getProgram());
fail("This should fail with an ProgramInvocationException");
}
- catch (ProgramInvocationException e) {
+ catch (IOException e) {
// that is what we want!
assertTrue(e.getCause() instanceof UnknownHostException);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 1b9fd73..ef2161e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.Test;
@@ -80,7 +79,7 @@ public class ClientConnectionTest {
testFailureBehavior(unreachableEndpoint);
}
- private void testFailureBehavior(InetSocketAddress unreachableEndpoint) {
+ private void testFailureBehavior(final InetSocketAddress unreachableEndpoint) {
final Configuration config = new Configuration();
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT/1000) + " s");
@@ -93,16 +92,13 @@ public class ClientConnectionTest {
JobVertex vertex = new JobVertex("Test Vertex");
vertex.setInvokableClass(TestInvokable.class);
- final JobGraph jg = new JobGraph("Test Job", vertex);
- final Client client = new Client(config, getClass().getClassLoader(), -1);
-
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Thread invoker = new Thread("test invoker") {
@Override
public void run() {
try {
- client.run(jg, true);
+ new Client(config);
fail("This should fail with an exception since the JobManager is unreachable.");
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index bc898b3..621ef63 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -22,20 +22,25 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
+
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -45,78 +50,56 @@ import org.apache.flink.runtime.util.SerializedThrowable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import scala.Some;
-import scala.Tuple2;
+import java.util.Collections;
+
import java.util.UUID;
import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
/**
* Simple and maybe stupid test to check the {@link Client} class.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Client.class)
public class ClientTest {
private PackagedProgram program;
- private Optimizer compilerMock;
- private JobGraphGenerator generatorMock;
-
private Configuration config;
private ActorSystem jobManagerSystem;
- private JobGraph jobGraph = new JobGraph("test graph");
@Before
public void setUp() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
+
+ Plan plan = env.createProgramPlan();
+ JobWithJars jobWithJars = new JobWithJars(plan, Collections.<String>emptyList());
+
+ program = mock(PackagedProgram.class);
+ when(program.getPlanWithJars()).thenReturn(jobWithJars);
+
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
- program = mock(PackagedProgram.class);
- compilerMock = mock(Optimizer.class);
- generatorMock = mock(JobGraphGenerator.class);
-
- JobWithJars planWithJarsMock = mock(JobWithJars.class);
- Plan planMock = mock(Plan.class);
- OptimizedPlan optimizedPlanMock = mock(OptimizedPlan.class);
-
- when(planMock.getJobName()).thenReturn("MockPlan");
-
- when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
- when(planWithJarsMock.getPlan()).thenReturn(planMock);
-
- whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class), any(Configuration.class)).thenReturn(this.compilerMock);
- when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
-
- whenNew(JobGraphGenerator.class).withAnyArguments().thenReturn(generatorMock);
- when(generatorMock.compileJobGraph(optimizedPlanMock)).thenReturn(jobGraph);
-
try {
- Tuple2<String, Object> address = new Tuple2<String, Object>("localhost", freePort);
- jobManagerSystem = AkkaUtils.createActorSystem(config, new Some<Tuple2<String, Object>>(address));
+ scala.Tuple2<String, Object> address = new scala.Tuple2<String, Object>("localhost", freePort);
+ jobManagerSystem = AkkaUtils.createActorSystem(config, new scala.Some<scala.Tuple2<String, Object>>(address));
}
catch (Exception e) {
e.printStackTrace();
@@ -145,15 +128,12 @@ public class ClientTest {
try {
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
- Client out = new Client(config, getClass().getClassLoader());
- JobSubmissionResult result = out.run(program.getPlanWithJars(), -1, false);
+ Client out = new Client(config);
+ JobSubmissionResult result = out.runDetached(program.getPlanWithJars(), 1);
assertNotNull(result);
program.deleteExtractedLibraries();
-
- verify(this.compilerMock, times(1)).compile(any(Plan.class));
- verify(this.generatorMock, times(1)).compileJobGraph(any(OptimizedPlan.class));
}
catch (Exception e) {
e.printStackTrace();
@@ -169,10 +149,10 @@ public class ClientTest {
try {
jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
- Client out = new Client(config, getClass().getClassLoader());
+ Client out = new Client(config);
try {
- out.run(program.getPlanWithJars(), -1, false);
+ out.runDetached(program.getPlanWithJars(), 1);
fail("This should fail with an exception");
}
catch (ProgramInvocationException e) {
@@ -181,9 +161,6 @@ public class ClientTest {
catch (Exception e) {
fail("wrong exception " + e);
}
-
- verify(this.compilerMock, times(1)).compile(any(Plan.class));
- verify(this.generatorMock, times(1)).compileJobGraph(any(OptimizedPlan.class));
}
catch (Exception e) {
e.printStackTrace();
@@ -198,10 +175,10 @@ public class ClientTest {
@Test
public void tryLocalExecution() {
try {
+ jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+
PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
-
when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
-
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -211,7 +188,7 @@ public class ClientTest {
}).when(packagedProgramMock).invokeInteractiveModeForExecution();
try {
- new Client(config, getClass().getClassLoader()).run(packagedProgramMock, 1, true);
+ new Client(config).runBlocking(packagedProgramMock, 1);
fail("Creating the local execution environment should not be possible");
}
catch (InvalidProgramException e) {
@@ -224,6 +201,34 @@ public class ClientTest {
}
}
+ @Test
+ public void testGetExecutionPlan() {
+ try {
+ jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
+
+ PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
+ assertNotNull(prg.getPreviewPlan());
+
+ Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+ OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, 1);
+ assertNotNull(op);
+
+ PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+ assertNotNull(dumper.getOptimizerPlanAsJSON(op));
+
+ // test HTML escaping
+ PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
+ dumper2.setEncodeForHTML(true);
+ String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
+
+ assertEquals(-1, htmlEscaped.indexOf('\\'));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
public static class SuccessReturningActor extends FlinkUntypedActor {
@@ -273,4 +278,33 @@ public class ClientTest {
return leaderSessionID;
}
}
+
+ public static class TestOptimizerPlan implements ProgramDescription {
+
+ @SuppressWarnings("serial")
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
+ .fieldDelimiter("\t").types(Long.class, Long.class);
+
+ DataSet<Tuple2<Long, Long>> result = input.map(
+ new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+ public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
+ return new Tuple2<Long, Long>(value.f0, value.f1+1);
+ }
+ });
+ result.writeAsCsv(args[1], "\n", "\t");
+ env.execute();
+ }
+ @Override
+ public String getDescription() {
+ return "TestOptimizerPlan <input-file-path> <output-file-path>";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index f156f77..116c1e6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.CliFrontendTestUtils;
-import org.junit.BeforeClass;
+
import org.junit.Test;
import static org.junit.Assert.*;
@@ -32,14 +31,10 @@ import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
- @BeforeClass
- public static void suppressOutput() {
- CliFrontendTestUtils.pipeSystemOutToNull();
- }
-
@Test
public void testExecuteAfterGetExecutionPlan() {
- ExecutionEnvironment env = new LocalEnvironment();
+ ExecutionEnvironment env = new LocalEnvironment();
+ env.getConfig().disableSysoutLogging();
DataSet<Integer> baseSet = env.fromElements(1, 2);
@@ -51,7 +46,9 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
try {
env.getExecutionPlan();
env.execute();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
+ e.printStackTrace();
fail("Cannot run both #getExecutionPlan and #execute.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index d1e971f..be2caaf 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -29,6 +29,9 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.Configuration;
@@ -49,9 +52,9 @@ public class ExecutionPlanCreationTest {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, mockJmAddress.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort());
-
- Client client = new Client(config, getClass().getClassLoader(), -1);
- OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1);
+
+ Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+ OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, -1);
assertNotNull(op);
PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 1a9f17f..95506f4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.PrintStream;
import org.apache.flink.client.CliFrontendTestUtils;
-import org.apache.flink.client.program.PackagedProgram;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 99e4906..7078e90 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -31,6 +31,7 @@ import backtype.storm.generated.NotAliveException;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
+import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -55,7 +56,6 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -174,26 +174,26 @@ public class FlinkClient {
throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
}
- final List<File> jarFiles = new ArrayList<File>();
- jarFiles.add(uploadedJarFile);
-
final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
final Configuration configuration = jobGraph.getJobConfiguration();
- final Client client;
-
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
- client = new Client(
- configuration,
- JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()),
- -1);
+ final Client client;
+ try {
+ client = new Client(configuration);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not establish a connection to the job manager", e);
+ }
try {
- client.run(jobGraph, false);
+ ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
+ Lists.newArrayList(uploadedJarFile),
+ this.getClass().getClassLoader());
+ client.runDetached(jobGraph, classLoader);
} catch (final ProgramInvocationException e) {
throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index bf06c75..92d2b98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -29,7 +30,7 @@ public class JobExecutionResult extends JobSubmissionResult {
private long netRuntime;
- private Map<String, Object> accumulatorResults;
+ private Map<String, Object> accumulatorResults = Collections.emptyMap();
/**
* Creates a new JobExecutionResult.
@@ -41,7 +42,10 @@ public class JobExecutionResult extends JobSubmissionResult {
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
super(jobID);
this.netRuntime = netRuntime;
- this.accumulatorResults = accumulators;
+
+ if (accumulators != null) {
+ this.accumulatorResults = accumulators;
+ }
}
/**
@@ -106,4 +110,13 @@ public class JobExecutionResult extends JobSubmissionResult {
}
return (Integer) result;
}
+
+ /**
+ * Returns a dummy object for wrapping a JobSubmissionResult
+ * @param result The SubmissionResult
+ * @return a JobExecutionResult
+ */
+ public static JobExecutionResult fromJobSubmissionResult(JobSubmissionResult result) {
+ return new JobExecutionResult(result.getJobID(), -1, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
index 7478da4..13a1a32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
@@ -15,35 +15,75 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.common;
-import javax.xml.bind.DatatypeConverter;
import org.apache.flink.util.AbstractID;
+import javax.xml.bind.DatatypeConverter;
import java.nio.ByteBuffer;
/**
- * Unique Job Identifier
+ * Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond
+ * do dataflow graphs.
+ *
+ * <p>Jobs act simultaneously as <i>sessions</i>, because jobs can be created and submitted
+ * incrementally in different parts. Newer fragments of a graph can be attached to existing
+ * graphs, thereby extending the current data flow graphs.</p>
*/
public final class JobID extends AbstractID {
private static final long serialVersionUID = 1L;
-
+
+ /**
+ * Creates a new (statistically) random JobID.
+ */
public JobID() {
super();
}
+ /**
+ * Creates a new JobID, using the given lower and upper parts.
+ *
+ * @param lowerPart The lower 8 bytes of the ID.
+ * @param upperPart The upper 8 bytes of the ID.
+ */
public JobID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}
+ /**
+ * Creates a new JobID from the given byte sequence. The byte sequence must be
+ * exactly 16 bytes long. The first eight bytes make up the lower part of the ID,
+ * while the next 8 bytes make up the upper part of the ID.
+ *
+ * @param bytes The byte sequence.
+ */
public JobID(byte[] bytes) {
super(bytes);
}
+
+ // ------------------------------------------------------------------------
+ // Static factory methods
+ // ------------------------------------------------------------------------
+ /**
+ * Creates a new (statistically) random JobID.
+ *
+ * @return A new random JobID.
+ */
public static JobID generate() {
return new JobID();
}
+ /**
+ * Creates a new JobID from the given byte sequence. The byte sequence must be
+ * exactly 16 bytes long. The first eight bytes make up the lower part of the ID,
+ * while the next 8 bytes make up the upper part of the ID.
+ *
+ * @param bytes The byte sequence.
+ *
+ * @return A new JobID corresponding to the ID encoded in the bytes.
+ */
public static JobID fromByteArray(byte[] bytes) {
return new JobID(bytes);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index 5cea9d5..3a18eb4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.common;
/**
- * The result of a job submission.
- * Contains the JobID
+ * The result of submitting a job to a JobManager.
*/
public class JobSubmissionResult {
+
private JobID jobID;
public JobSubmissionResult(JobID jobID) {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index e07ea45..e0d1eb8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -43,12 +43,14 @@ import org.apache.flink.util.Visitable;
import org.apache.flink.util.Visitor;
/**
- * This class encapsulates a single job (an instantiated data flow), together with some parameters.
- * Parameters include the name and a default parallelism. The job is referenced by the data sinks,
- * from which a traversal reaches all connected nodes of the job.
+ * This class represents Flink programs, in the form of dataflow plans.
+ *
+ * <p>The dataflow is referenced by the data sinks, from which all connected
+ * operators of the data flow can be reached via backwards traversal</p>.
*/
public class Plan implements Visitable<Operator<?>> {
+ /** The default parallelism indicates to use the cluster's default */
private static final int DEFAULT_PARALELLISM = -1;
/**
@@ -57,34 +59,31 @@ public class Plan implements Visitable<Operator<?>> {
*/
protected final List<GenericDataSinkBase<?>> sinks = new ArrayList<GenericDataSinkBase<?>>(4);
- /**
- * The name of the job.
- */
+ /** The name of the job. */
protected String jobName;
- /**
- * The default parallelism to use for nodes that have no explicitly specified parallelism.
- */
+ /** The default parallelism to use for nodes that have no explicitly specified parallelism. */
protected int defaultParallelism = DEFAULT_PARALELLISM;
- /**
- * Hash map for files in the distributed cache: registered name to cache entry.
- */
+ /** Hash map for files in the distributed cache: registered name to cache entry. */
protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<String, DistributedCacheEntry>();
+
+ /** Config object for runtime execution parameters. */
+ protected ExecutionConfig executionConfig;
- /**
- * Config object for runtime execution parameters.
- */
- protected ExecutionConfig executionConfig = null;
+ /** The ID of the Job that this dataflow plan belongs to */
+ private JobID jobId;
+
+ private long sessionTimeout;
// ------------------------------------------------------------------------
/**
- * Creates a new program plan with the given name, describing the data flow that ends at the
+ * Creates a new dataflow plan with the given name, describing the data flow that ends at the
* given data sinks.
- * <p>
- * If not all of the sinks of a data flow are given to the plan, the flow might
- * not be translated entirely.
+ *
+ * <p>If not all of the sinks of a data flow are given to the plan, the flow might
+ * not be translated entirely.</p>
*
* @param sinks The collection will the sinks of the job's data flow.
* @param jobName The name to display for the job.
@@ -238,7 +237,37 @@ public class Plan implements Visitable<Operator<?>> {
checkNotNull(jobName, "The job name must not be null.");
this.jobName = jobName;
}
-
+
+ /**
+ * Gets the ID of the job that the dataflow plan belongs to.
+ * If this ID is not set, then the dataflow represents its own
+ * independent job.
+ *
+ * @return The ID of the job that the dataflow plan belongs to.
+ */
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Sets the ID of the job that the dataflow plan belongs to.
+ * If this ID is set to {@code null}, then the dataflow represents its own
+ * independent job.
+ *
+ * @param jobId The ID of the job that the dataflow plan belongs to.
+ */
+ public void setJobId(JobID jobId) {
+ this.jobId = jobId;
+ }
+
+ public void setSessionTimeout(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
/**
* Gets the default parallelism for this job. That degree is always used when an operator
* is not explicitly given a parallelism.
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 1294011..514692f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common;
import org.apache.flink.configuration.Configuration;
@@ -26,14 +25,20 @@ import java.util.Collections;
import java.util.List;
/**
- * A PlanExecutor runs a plan. The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) determines where and how to run the plan.
+ * A PlanExecutor executes a Flink program's dataflow plan. All Flink programs are translated to
+ * dataflow plans prior to execution.
+ *
+ * <p>The specific implementation (such as the org.apache.flink.client.LocalExecutor
+ * and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow.
+ * The concrete implementations of the executors are loaded dynamically, because they depend on
+ * the full set of all runtime classes.</p>
*
- * The concrete implementations are loaded dynamically, because they depend on the full set of
- * dependencies of all runtime classes.
+ * <p>PlanExecutors can be started explicitly, in which case they keep running until stopped. If
+ * a program is submitted to a plan executor that is not running, it will start up for that
+ * program, and shut down afterwards.</p>
*/
public abstract class PlanExecutor {
-
+
private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";
@@ -43,21 +48,68 @@ public abstract class PlanExecutor {
/** If true, all execution progress updates are not only logged, but also printed to System.out */
private boolean printUpdatesToSysout = true;
-
+
+ /**
+ * Sets whether the executor should print progress results to "standard out" ({@link System#out}).
+ * All progress messages are logged using the configured logging framework independent of the value
+ * set here.
+ *
+ * @param printStatus True, to print progress updates to standard out, false to not do that.
+ */
public void setPrintStatusDuringExecution(boolean printStatus) {
this.printUpdatesToSysout = printStatus;
}
-
+
+ /**
+ * Gets whether the executor prints progress results to "standard out" ({@link System#out}).
+ *
+ * @return True, if the executor prints progress messages to standard out, false if not.
+ */
public boolean isPrintingStatusDuringExecution() {
return this.printUpdatesToSysout;
}
+
+ // ------------------------------------------------------------------------
+ // Startup & Shutdown
+ // ------------------------------------------------------------------------
+
+ /**
+ * Starts the program executor. After the executor has been started, it will keep
+ * running until {@link #stop()} is called.
+ *
+ * @throws Exception Thrown, if the executor startup failed.
+ */
+ public abstract void start() throws Exception;
+
+ /**
+ * Shuts down the plan executor and releases all local resources.
+ *
+ * <p>This method also ends all sessions created by this executor. Remote job executions
+ * may complete, but the session is not kept alive after that.</p>
+ *
+ * @throws Exception Thrown, if the proper shutdown failed.
+ */
+ public abstract void stop() throws Exception;
+
+ /**
+ * Checks if this executor is currently running.
+ *
+ * @return True is the executor is running, false otherwise.
+ */
+ public abstract boolean isRunning();
// ------------------------------------------------------------------------
// Program Execution
// ------------------------------------------------------------------------
/**
- * Execute the given plan and return the runtime in milliseconds.
+ * Execute the given program.
+ *
+ * <p>If the executor has not been started before, then this method will start the
+ * executor and stop it after the execution has completed. This implies that one needs
+ * to explicitly start the executor for all programs where multiple dataflow parts
+ * depend on each other. Otherwise, the previous parts will no longer
+ * be available, because the executor immediately shut down after the execution.</p>
*
* @param plan The plan of the program to execute.
* @return The execution result, containing for example the net runtime of the program, and the accumulators.
@@ -66,7 +118,6 @@ public abstract class PlanExecutor {
*/
public abstract JobExecutionResult executePlan(Plan plan) throws Exception;
-
/**
* Gets the programs execution plan in a JSON format.
*
@@ -77,7 +128,17 @@ public abstract class PlanExecutor {
*/
public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
-
+ /**
+ * Ends the job session, identified by the given JobID. Jobs can be kept around as sessions,
+ * if a session timeout is specified. Keeping Jobs as sessions allows users to incrementally
+ * add new operations to their dataflow, that refer to previous intermediate results of the
+ * dataflow.
+ *
+ * @param jobID The JobID identifying the job session.
+ * @throws Exception Thrown, if the message to finish the session cannot be delivered.
+ */
+ public abstract void endSession(JobID jobID) throws Exception;
+
// ------------------------------------------------------------------------
// Executor Factories
// ------------------------------------------------------------------------
@@ -102,7 +163,7 @@ public abstract class PlanExecutor {
/**
* Creates an executor that runs the plan on a remote environment. The remote executor is typically used
* to send the program to a cluster for execution.
- *
+ *
* @param hostname The address of the JobManager to send the program to.
* @param port The port of the JobManager to send the program to.
* @param clientConfiguration The configuration for the client (Akka, default.parallelism).
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index 51e91d7..dbb7cc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -52,4 +52,8 @@ public class CollectionEnvironment extends ExecutionEnvironment {
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
}
+
+ @Override
+ public void startNewSession() throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 64b3a1d..23b5a57 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -26,11 +26,11 @@ import java.util.Calendar;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.io.FileInputFormat;
@@ -94,7 +94,8 @@ import com.google.common.base.Preconditions;
*/
public abstract class ExecutionEnvironment {
- private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
+ /** The logger used by the environment and its subclasses */
+ protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
/** The environment of the context (local by default, cluster if invoked through command line) */
private static ExecutionEnvironmentFactory contextEnvironmentFactory;
@@ -106,34 +107,42 @@ public abstract class ExecutionEnvironment {
private static boolean allowLocalExecution = true;
// --------------------------------------------------------------------------------------------
-
- private final UUID executionId;
-
+
private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
private final ExecutionConfig config = new ExecutionConfig();
- /** Result from the latest execution, to be make it retrievable when using eager execution methods */
+ /** Result from the latest execution, to make it retrievable when using eager execution methods */
protected JobExecutionResult lastJobExecutionResult;
+
+ /** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
+ * Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph */
+ protected JobID jobID;
+
+ /** The session timeout in seconds */
+ protected long sessionTimeout;
/** Flag to indicate whether sinks have been cleared in previous executions */
private boolean wasExecuted = false;
-
- // --------------------------------------------------------------------------------------------
- // Constructor and Properties
- // --------------------------------------------------------------------------------------------
+
/**
* Creates a new Execution Environment.
*/
protected ExecutionEnvironment() {
- this.executionId = UUID.randomUUID();
+ jobID = JobID.generate();
}
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
/**
- * Gets the config object.
+ * Gets the config object that defines execution parameters.
+ *
+ * @return The environment's execution configuration.
*/
public ExecutionConfig getConfig() {
return config;
@@ -228,41 +237,72 @@ public abstract class ExecutionEnvironment {
}
/**
- * Gets the UUID by which this environment is identified. The UUID sets the execution context
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+ *
+ * @return The execution result from the latest job execution.
+ */
+ public JobExecutionResult getLastJobExecutionResult(){
+ return this.lastJobExecutionResult;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Session Management
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Gets the JobID by which this environment is identified. The JobID sets the execution context
* in the cluster or local environment.
*
- * @return The UUID of this environment.
+ * @return The JobID of this environment.
* @see #getIdString()
*/
- public UUID getId() {
- return this.executionId;
+ public JobID getId() {
+ return this.jobID;
}
/**
- * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
- *
- * @return The execution result from the latest job execution.
+ * Gets the JobID by which this environment is identified, as a string.
+ *
+ * @return The JobID as a string.
+ * @see #getId()
*/
- public JobExecutionResult getLastJobExecutionResult(){
- return this.lastJobExecutionResult;
+ public String getIdString() {
+ return this.jobID.toString();
}
+ /**
+ * Sets the session timeout to hold the intermediate results of a job. This only
+ * applies the updated timeout in future executions.
+ *
+ * @param timeout The timeout, in seconds.
+ */
+ public void setSessionTimeout(long timeout) {
+ if (timeout < 0) {
+ throw new IllegalArgumentException("The session timeout must not be less than zero.");
+ }
+ this.sessionTimeout = timeout;
+ }
/**
- * Gets the UUID by which this environment is identified, as a string.
+ * Gets the session timeout for this environment. The session timeout defines for how long
+ * after an execution, the job and its intermediate results will be kept for future
+ * interactions.
*
- * @return The UUID as a string.
- * @see #getId()
+ * @return The session timeout, in seconds.
*/
- public String getIdString() {
- return this.executionId.toString();
+ public long getSessionTimeout() {
+ return sessionTimeout;
}
+ /**
+ * Starts a new session, discarding the previous data flow and all of its intermediate results.
+ */
+ public abstract void startNewSession() throws Exception;
+
// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
-
/**
* Adds a new Kryo default serializer to the Runtime.
*
@@ -944,7 +984,7 @@ public abstract class ExecutionEnvironment {
}
if(typeInfo instanceof CompositeType) {
List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<GenericTypeInfo<?>>();
- Utils.getContainedGenericTypes((CompositeType)typeInfo, genericTypesInComposite);
+ Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
for(GenericTypeInfo<?> gt : genericTypesInComposite) {
Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
}
@@ -1176,4 +1216,5 @@ public abstract class ExecutionEnvironment {
public static boolean localExecutionIsAllowed() {
return allowLocalExecution;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 27b6254..5fd272b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -20,49 +20,113 @@ package org.apache.flink.api.java;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;
/**
* An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
- * environment is instantiated. When this environment is instantiated, it uses a default parallelism
- * of {@code 1}. Local environments can also be instantiated through
- * {@link ExecutionEnvironment#createLocalEnvironment()} and {@link ExecutionEnvironment#createLocalEnvironment(int)}.
- * The former version will pick a default parallelism equal to the number of hardware contexts in the local
- * machine.
+ * environment is instantiated.
+ *
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. Teh default
+ * parallelism can be set via {@link #setParallelism(int)}.</p>
+ *
+ * <p>Local environments can also be instantiated through {@link ExecutionEnvironment#createLocalEnvironment()}
+ * and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
+ * default parallelism equal to the number of hardware contexts in the local machine.</p>
*/
public class LocalEnvironment extends ExecutionEnvironment {
+
+ /** The user-defined configuration for the local execution */
private Configuration configuration;
+
+ /** Create lazily upon first use */
+ private PlanExecutor executor;
+
+ /** In case we keep the executor alive for sessions, this reaper shuts it down eventually.
+ * The reaper's finalize method triggers the executor shutdown. */
+ @SuppressWarnings("all")
+ private ExecutorReaper executorReaper;
+
/**
* Creates a new local environment.
*/
public LocalEnvironment() {
- if(!ExecutionEnvironment.localExecutionIsAllowed()) {
+ if (!ExecutionEnvironment.localExecutionIsAllowed()) {
throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
}
+ this.configuration = new Configuration();
+ }
+
+ /**
+ * Sets a configuration used to configure the local Flink executor.
+ * If {@code null} is passed, then the default configuration will be used.
+ *
+ * @param customConfiguration The configuration to be used for the local execution.
+ */
+ public void setConfiguration(Configuration customConfiguration) {
+ this.configuration = customConfiguration != null ? customConfiguration : new Configuration();
}
// --------------------------------------------------------------------------------------------
@Override
public JobExecutionResult execute(String jobName) throws Exception {
+ if (executor == null) {
+ startNewSession();
+ }
+
Plan p = createProgramPlan(jobName);
-
- PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
- executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
- this.lastJobExecutionResult = executor.executePlan(p);
- return this.lastJobExecutionResult;
+ p.setJobId(jobID);
+ p.setSessionTimeout(sessionTimeout);
+
+ JobExecutionResult result = executor.executePlan(p);
+
+ this.lastJobExecutionResult = result;
+ return result;
}
@Override
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan(null, false);
- PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
- return executor.getOptimizerPlanAsJSON(p);
+ // make sure that we do not start an executor in any case here.
+ // if one runs, fine, of not, we only create the class but disregard immediately afterwards
+ if (executor != null) {
+ return executor.getOptimizerPlanAsJSON(p);
+ }
+ else {
+ PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
+ return tempExecutor.getOptimizerPlanAsJSON(p);
+ }
}
- // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void startNewSession() throws Exception {
+ if (executor != null) {
+ // we need to end the previous session
+ executor.stop();
+ // create also a new JobID
+ jobID = JobID.generate();
+ }
+
+ // create a new local executor
+ executor = PlanExecutor.createLocalExecutor(configuration);
+ executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+
+ // if we have a session, start the mini cluster eagerly to have it available across sessions
+ if (getSessionTimeout() > 0) {
+ executor.start();
+
+ // also install the reaper that will shut it down eventually
+ executorReaper = new ExecutorReaper(executor);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
@Override
public String toString() {
@@ -70,7 +134,91 @@ public class LocalEnvironment extends ExecutionEnvironment {
+ ") : " + getIdString();
}
- public void setConfiguration(Configuration customConfiguration) {
- this.configuration = customConfiguration;
+ // ------------------------------------------------------------------------
+ // Reaping the local executor when in session mode
+ // ------------------------------------------------------------------------
+
+ /**
+ * This thread shuts down the local executor.
+ *
+ * <p><b>IMPORTANT:</b> This must be a static inner class to hold no reference to the outer class.
+ * Otherwise, the outer class could never become garbage collectible while this thread runs.</p>
+ */
+ private static class ShutdownThread extends Thread {
+
+ private final Object monitor = new Object();
+
+ private final PlanExecutor executor;
+
+ private volatile boolean running = true;
+ private volatile boolean triggered = false;
+
+ ShutdownThread(PlanExecutor executor) {
+ super("Local cluster reaper");
+ setDaemon(true);
+ setPriority(Thread.MIN_PRIORITY);
+
+ this.executor = executor;
+ }
+
+ @Override
+ public void run() {
+ synchronized (monitor) {
+ while (running && !triggered) {
+ try {
+ monitor.wait();
+ }
+ catch (InterruptedException e) {
+ // should never happen
+ }
+ }
+ }
+
+ if (running && triggered) {
+ try {
+ executor.stop();
+ }
+ catch (Throwable t) {
+ System.err.println("Cluster reaper caught exception during shutdown");
+ t.printStackTrace();
+ }
+ }
+ }
+
+ void trigger() {
+ triggered = true;
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+
+ void cancel() {
+ running = false;
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * A class that, upon finalization, shuts down the local mini cluster by triggering the reaper
+ * thread.
+ */
+ private static class ExecutorReaper {
+
+ private final ShutdownThread shutdownThread;
+
+ ExecutorReaper(PlanExecutor executor) {
+ this.shutdownThread = new ShutdownThread(executor);
+ this.shutdownThread.start();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+
+ shutdownThread.trigger();
+ shutdownThread.cancel();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 1f73e93..6ae1f26 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -19,30 +19,47 @@
package org.apache.flink.api.java;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;
/**
- * An {@link ExecutionEnvironment} that sends programs
- * to a cluster for execution. Note that all file paths used in the program must be accessible from the
- * cluster. The execution will use the cluster's default parallelism, unless the parallelism is
- * set explicitly via {@link ExecutionEnvironment#setParallelism(int)}.
+ * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
+ * needs to be created with the address and port of the JobManager of the Flink cluster that
+ * should execute the programs.
+ *
+ * <p>Many programs executed via the remote environment depend on additional classes. Such classes
+ * may be the classes of functions (transformation, aggregation, ...) or libraries. Those classes
+ * must be attached to the remote environment as JAR files, to allow the environment to ship the
+ * classes into the cluster for the distributed execution.</p>
*/
public class RemoteEnvironment extends ExecutionEnvironment {
+ /** The hostname of the JobManager */
protected final String host;
-
+
+ /** The port of the JobManager main actor system */
protected final int port;
-
+
+ /** The jar files that need to be attached to each job */
private final String[] jarFiles;
+ /** The remote executor lazily created upon first use */
+ private PlanExecutor executor;
+
private Configuration clientConfiguration;
+
+ /** Optional shutdown hook, used in session mode to eagerly terminate the last session */
+ private Thread shutdownHook;
+
/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
* given host name and port.
*
+ * <p>Each program execution will have all the given JAR files in its classpath.</p>
+ *
* @param host The host name or address of the master (JobManager), where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
@@ -53,45 +70,137 @@ public class RemoteEnvironment extends ExecutionEnvironment {
if (host == null) {
throw new NullPointerException("Host must not be null.");
}
-
if (port < 1 || port >= 0xffff) {
throw new IllegalArgumentException("Port out of range");
}
-
+
this.host = host;
this.port = port;
this.jarFiles = jarFiles;
}
-
-
+
+ // ------------------------------------------------------------------------
+
@Override
public JobExecutionResult execute(String jobName) throws Exception {
+ ensureExecutorCreated();
+
Plan p = createProgramPlan(jobName);
-
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
- executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+ p.setJobId(jobID);
+ p.setSessionTimeout(sessionTimeout);
- this.lastJobExecutionResult = executor.executePlan(p);
- return this.lastJobExecutionResult;
+ JobExecutionResult result = executor.executePlan(p);
+
+ this.lastJobExecutionResult = result;
+ return result;
}
-
+
@Override
public String getExecutionPlan() throws Exception {
- Plan p = createProgramPlan("unnamed", false);
- p.setDefaultParallelism(getParallelism());
- registerCachedFilesWithPlan(p);
+ Plan p = createProgramPlan("plan", false);
+
+ // make sure that we do not start an new executor here
+ // if one runs, fine, of not, we create a local executor (lightweight) and let it
+ // generate the plan
+ if (executor != null) {
+ return executor.getOptimizerPlanAsJSON(p);
+ }
+ else {
+ PlanExecutor le = PlanExecutor.createLocalExecutor(null);
+ return le.getOptimizerPlanAsJSON(p);
+ }
+ }
+
+ @Override
+ public void startNewSession() throws Exception {
+ dispose();
+ jobID = JobID.generate();
+ installShutdownHook();
+ }
+
+ private void ensureExecutorCreated() throws Exception {
+ if (executor == null) {
+ executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
+ executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+ }
- PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
- return executor.getOptimizerPlanAsJSON(p);
+ // if we are using sessions, we keep the executor running
+ if (getSessionTimeout() > 0 && !executor.isRunning()) {
+ executor.start();
+ installShutdownHook();
+ }
}
+ // ------------------------------------------------------------------------
+ // Dispose
+ // ------------------------------------------------------------------------
+
+ protected void dispose() {
+ // Remove shutdown hook to prevent resource leaks, unless this is invoked by the
+ // shutdown hook itself
+ if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException e) {
+ // race, JVM is in shutdown already, we can safely ignore this
+ }
+ catch (Throwable t) {
+ LOG.warn("Exception while unregistering the cleanup shutdown hook.");
+ }
+ }
+
+ try {
+ PlanExecutor executor = this.executor;
+ if (executor != null) {
+ executor.endSession(jobID);
+ executor.stop();
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to dispose the session shutdown hook.");
+ }
+ }
+
@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
}
-
+
public void setClientConfiguration(Configuration clientConfiguration) {
this.clientConfiguration = clientConfiguration;
}
+
+ // ------------------------------------------------------------------------
+ // Shutdown hooks and reapers
+ // ------------------------------------------------------------------------
+
+ private void installShutdownHook() {
+ if (shutdownHook == null) {
+ Thread shutdownHook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ dispose();
+ }
+ catch (Throwable t) {
+ LOG.error("Error in cleanup of RemoteEnvironment during JVM shutdown: " + t.getMessage(), t);
+ }
+ }
+ });
+
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ this.shutdownHook = shutdownHook;
+ }
+ catch (IllegalStateException e) {
+ // JVM is already shutting down. no need or a shutdown hook
+ }
+ catch (Throwable t) {
+ LOG.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
index d56be87..311c286 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -27,9 +27,10 @@ import org.apache.flink.util.Visitor;
/**
* The execution plan generated by the Optimizer. It contains {@link PlanNode}s
* and {@link Channel}s that describe exactly how the program should be executed.
- * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all
- * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
- * and the data exchange modes (batched, pipelined).
+ *
+ * <p>The optimized plan defines all ship strategies (local pipe, shuffle, broadcast, rebalance),
+ * all operator strategies (sorting-merge join, hash join, sorted grouping, ...),
+ * and the data exchange modes (batched, pipelined).</p>
*/
public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> {
@@ -42,7 +43,7 @@ public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> {
/** All nodes in the optimizer plan. */
private final Collection<PlanNode> allNodes;
- /** The original program. */
+ /** The original program (as a dataflow plan). */
private final Plan originalProgram;
/** Name of the job */
@@ -104,11 +105,11 @@ public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> {
}
/**
- * Gets the original program plan from which this optimized plan was created.
+ * Gets the original program's dataflow plan from which this optimized plan was created.
*
- * @return The original program plan.
+ * @return The original program's dataflow plan.
*/
- public Plan getOriginalPactPlan() {
+ public Plan getOriginalPlan() {
return this.originalProgram;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 0cbcea8..3567fad 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer.plantranslate;
import com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -159,9 +160,21 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
* {@link org.apache.flink.runtime.jobgraph.JobGraph}.
*
* @param program Optimized plan that is translated into a JobGraph.
- * @return JobGraph generated frmo the plan.
+ * @return JobGraph generated from the plan.
*/
public JobGraph compileJobGraph(OptimizedPlan program) {
+ return compileJobGraph(program, null);
+ }
+
+ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
+ if (program == null) {
+ throw new NullPointerException();
+ }
+
+ if (jobId == null) {
+ jobId = JobID.generate();
+ }
+
this.vertices = new HashMap<PlanNode, JobVertex>();
this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
this.chainedTasksInSequence = new ArrayList<TaskInChain>();
@@ -204,9 +217,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// ----------- finalize the job graph -----------
// create the job graph object
- JobGraph graph = new JobGraph(program.getJobName());
- graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
+ JobGraph graph = new JobGraph(jobId, program.getJobName());
+ graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
graph.setAllowQueuedScheduling(false);
+ graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
// add vertices to the graph
for (JobVertex vertex : this.vertices.values()) {
@@ -219,13 +233,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
// add registered cache file into job configuration
- for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
+ for (Entry<String, DistributedCacheEntry> e : program.getOriginalPlan().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
}
try {
InstantiationUtil.writeObjectToConfig(
- program.getOriginalPactPlan().getExecutionConfig(),
+ program.getOriginalPlan().getExecutionConfig(),
graph.getJobConfiguration(),
ExecutionConfig.CONFIG_KEY);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
index a685ff4..3180ab4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
@@ -75,7 +75,7 @@ public class JavaApiPostPass implements OptimizerPostPass {
@Override
public void postPass(OptimizedPlan plan) {
- executionConfig = plan.getOriginalPactPlan().getExecutionConfig();
+ executionConfig = plan.getOriginalPlan().getExecutionConfig();
for (SinkPlanNode sink : plan.getDataSinks()) {
traverse(sink);
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9d64866..c51bc7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -122,7 +122,6 @@ public class JobClient {
* @param jobGraph JobGraph describing the Flink job
* @param timeout Timeout for futures
* @param sysoutLogUpdates prints log updates to system out if true
- * @param userCodeClassloader class loader to be used for deserialization
* @return The job execution result
* @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
* execution fails.
@@ -133,7 +132,7 @@ public class JobClient {
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
- ClassLoader userCodeClassloader) throws JobExecutionException {
+ ClassLoader classLoader) throws JobExecutionException {
checkNotNull(actorSystem, "The actorSystem must not be null.");
checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
@@ -182,7 +181,7 @@ public class JobClient {
SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
if (result != null) {
try {
- return result.toJobExecutionResult(userCodeClassloader);
+ return result.toJobExecutionResult(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobGraph.getJobID(),
@@ -199,7 +198,7 @@ public class JobClient {
SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
if (serThrowable != null) {
- Throwable cause = serThrowable.deserializeError(userCodeClassloader);
+ Throwable cause = serThrowable.deserializeError(classLoader);
if (cause instanceof JobExecutionException) {
throw (JobExecutionException) cause;
}
@@ -230,7 +229,7 @@ public class JobClient {
ActorGateway jobManagerGateway,
JobGraph jobGraph,
FiniteDuration timeout,
- ClassLoader userCodeClassloader) throws JobExecutionException {
+ ClassLoader classLoader) throws JobExecutionException {
checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
@@ -269,7 +268,7 @@ public class JobClient {
else if (result instanceof JobManagerMessages.JobResultFailure) {
try {
SerializedThrowable t = ((JobManagerMessages.JobResultFailure) result).cause();
- throw t.deserializeError(userCodeClassloader);
+ throw t.deserializeError(classLoader);
}
catch (JobExecutionException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 7871a8c..7c6a4af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
/**
* This exception is the base exception for all exceptions that denote any failure during
- * teh execution of a job. The JobExecutionException and its subclasses are thrown by
+ * the execution of a job. The JobExecutionException and its subclasses are thrown by
* the {@link JobClient}.
*/
public class JobExecutionException extends Exception {
[2/4] flink git commit: [FLINK-2097][core] implement a job session
management
Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 70a49fd..889ce43 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -178,6 +178,9 @@ public class ExecutionGraph implements Serializable {
/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
private boolean snapshotCheckpointsEnabled;
+
+ /** Flag to indicate whether the Graph has been archived */
+ private boolean isArchived = false;
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
@@ -326,6 +329,9 @@ public class ExecutionGraph implements Serializable {
return scheduleMode;
}
+ public boolean isArchived() {
+ return isArchived;
+ }
public void enableSnapshotCheckpointing(
long interval,
long checkpointTimeout,
@@ -779,6 +785,8 @@ public class ExecutionGraph implements Serializable {
requiredJarFiles.clear();
jobStatusListenerActors.clear();
executionListenerActors.clear();
+
+ isArchived = true;
}
public ExecutionConfig getExecutionConfig() {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 7677a1b..e4a0209 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -70,15 +70,19 @@ public class JobGraph implements Serializable {
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-
- /** ID of this job. */
+
+ /** ID of this job. May be set if specific job id is desired (e.g. session management) */
private final JobID jobID;
/** Name of this job. */
- private String jobName;
+ private final String jobName;
/** The number of times that failed tasks should be re-executed */
private int numExecutionRetries;
+
+ /** The number of seconds after which the corresponding ExecutionGraph is removed at the
+ * job manager after it has been executed. */
+ private long sessionTimeout = 0;
/** flag to enable queued scheduling */
private boolean allowQueuedScheduling;
@@ -86,7 +90,7 @@ public class JobGraph implements Serializable {
/** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
- /** The settings for asynchronous snapshotting */
+ /** The settings for asynchronous snapshots */
private JobSnapshottingSettings snapshotSettings;
// --------------------------------------------------------------------------------------------
@@ -99,19 +103,19 @@ public class JobGraph implements Serializable {
}
/**
- * Constructs a new job graph with the given name and a random job ID.
+ * Constructs a new job graph with the given name, a random job ID.
*
* @param jobName The name of the job
*/
public JobGraph(String jobName) {
this(null, jobName);
}
-
+
/**
- * Constructs a new job graph with the given name and a random job ID.
+ * Constructs a new job graph with the given name and a random job ID if null supplied as an id.
*
- * @param jobId The id of the job
- * @param jobName The name of the job
+ * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
+ * @param jobName The name of the job.
*/
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
@@ -119,7 +123,7 @@ public class JobGraph implements Serializable {
}
/**
- * Constructs a new job graph with no name and a random job ID.
+ * Constructs a new job graph with no name and a random job ID if null supplied as an id.
*
* @param vertices The vertices to add to the graph.
*/
@@ -138,9 +142,9 @@ public class JobGraph implements Serializable {
}
/**
- * Constructs a new job graph with the given name and a random job ID.
+ * Constructs a new job graph with the given name and a random job ID if null supplied as an id.
*
- * @param jobId The id of the job.
+ * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
* @param jobName The name of the job.
* @param vertices The vertices to add to the graph.
*/
@@ -162,7 +166,7 @@ public class JobGraph implements Serializable {
public JobID getJobID() {
return this.jobID;
}
-
+
/**
* Returns the name assigned to the job graph.
*
@@ -173,9 +177,10 @@ public class JobGraph implements Serializable {
}
/**
- * Returns the configuration object for this job if it is set.
+ * Returns the configuration object for this job. Job-wide parameters should be set into that
+ * configuration object.
*
- * @return the configuration object for this job, or <code>null</code> if it is not set
+ * @return The configuration object for this job.
*/
public Configuration getJobConfiguration() {
return this.jobConfiguration;
@@ -190,7 +195,8 @@ public class JobGraph implements Serializable {
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
- throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ throw new IllegalArgumentException(
+ "The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numExecutionRetries = numberOfExecutionRetries;
}
@@ -205,11 +211,29 @@ public class JobGraph implements Serializable {
public int getNumberOfExecutionRetries() {
return numExecutionRetries;
}
+
+ /**
+ * Gets the timeout after which the corresponding ExecutionGraph is removed at the
+ * job manager after it has been executed.
+ * @return a timeout as a long in seconds.
+ */
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ /**
+ * Sets the timeout of the session in seconds. The timeout specifies how long a job will be kept
+ * in the job manager after it finishes.
+ * @param sessionTimeout The timeout in seconds
+ */
+ public void setSessionTimeout(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
this.allowQueuedScheduling = allowQueuedScheduling;
}
-
+
public boolean getAllowQueuedScheduling() {
return allowQueuedScheduling;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index c7ea06e..60aadf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -187,7 +187,7 @@ public class TaskExecutionState implements java.io.Serializable {
@Override
public String toString() {
- return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s",
+ return String.format("TaskState jobId=%s, jobID=%s, state=%s, error=%s",
jobID, executionId, executionState,
throwable == null ? "(null)" : throwable.toString());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 26d7272..75ad20f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
import akka.actor.ActorRef
+
/**
* Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
* submitted the job, when the start time and, if already terminated, the end time was.
@@ -29,7 +30,15 @@ import akka.actor.ActorRef
* @param client Actor which submitted the job
* @param start Starting time
*/
-class JobInfo(val client: ActorRef, val start: Long){
+class JobInfo(val client: ActorRef, val start: Long,
+ val sessionTimeout: Long) {
+
+ var sessionAlive = sessionTimeout > 0
+
+ var lastActive = 0L
+
+ setLastActive()
+
var end: Long = -1
def duration: Long = {
@@ -39,8 +48,13 @@ class JobInfo(val client: ActorRef, val start: Long){
-1
}
}
+
+ def setLastActive() =
+ lastActive = System.currentTimeMillis()
}
object JobInfo{
- def apply(client: ActorRef, start: Long) = new JobInfo(client, start)
+ def apply(client: ActorRef, start: Long,
+ sessionTimeout: Long) =
+ new JobInfo(client, start, sessionTimeout)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d93b2ed..444ab0b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -74,6 +74,8 @@ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ForkJoinPool
import scala.language.postfixOps
import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+
/**
* The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
@@ -121,7 +123,7 @@ class JobManager(
override val log = Logger(getClass)
- /** List of current jobs running jobs */
+ /** Either running or not yet archived jobs (session hasn't been ended). */
protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
var leaderSessionID: Option[UUID] = None
@@ -426,7 +428,19 @@ class JobManager(
}
}
- removeJob(jobID)
+
+ if (jobInfo.sessionAlive) {
+ jobInfo.setLastActive()
+ val lastActivity = jobInfo.lastActive
+ context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+ // remove only if no activity occurred in the meantime
+ if (lastActivity == jobInfo.lastActive) {
+ removeJob(jobID)
+ }
+ }
+ } else {
+ removeJob(jobID)
+ }
}
case None =>
@@ -555,6 +569,18 @@ class JobManager(
case RequestJobManagerStatus =>
sender() ! decorateMessage(JobManagerStatusAlive)
+ case RemoveCachedJob(jobID) =>
+ currentJobs.get(jobID) match {
+ case Some((graph, info)) =>
+ if (graph.getState.isTerminalState) {
+ removeJob(graph.getJobID)
+ } else {
+ // triggers removal upon completion of job
+ info.sessionAlive = false
+ }
+ case None =>
+ }
+
case Disconnect(msg) =>
val taskManager = sender()
@@ -624,19 +650,26 @@ class JobManager(
}
// see if there already exists an ExecutionGraph for the corresponding job ID
- executionGraph = currentJobs.getOrElseUpdate(
- jobGraph.getJobID,
- (new ExecutionGraph(
- executionContext,
- jobGraph.getJobID,
- jobGraph.getName,
- jobGraph.getJobConfiguration(),
- timeout,
- jobGraph.getUserJarBlobKeys(),
- userCodeLoader),
- JobInfo(client, System.currentTimeMillis())
- )
- )._1
+ executionGraph = currentJobs.get(jobGraph.getJobID) match {
+ case Some((graph, jobInfo)) =>
+ jobInfo.setLastActive()
+ graph
+ case None =>
+ val graph = new ExecutionGraph(
+ executionContext,
+ jobGraph.getJobID,
+ jobGraph.getName,
+ jobGraph.getJobConfiguration,
+ timeout,
+ jobGraph.getUserJarBlobKeys,
+ userCodeLoader)
+ val jobInfo = JobInfo(
+ client,
+ System.currentTimeMillis(),
+ jobGraph.getSessionTimeout)
+ currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
+ graph
+ }
// configure the execution graph
val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 0) {
@@ -990,25 +1023,26 @@ class JobManager(
* @param jobID ID of the job to remove and archive
*/
private def removeJob(jobID: JobID): Unit = {
- currentJobs.remove(jobID) match {
- case Some((eg, _)) =>
- try {
- eg.prepareForArchiving()
-
- archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
- } catch {
- case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
- "archiving.", t)
- }
+ currentJobs.synchronized {
+ currentJobs.remove(jobID) match {
+ case Some((eg, _)) =>
+ try {
+ eg.prepareForArchiving()
+ archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+ } catch {
+ case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+ "archiving.", t)
+ }
- case None =>
- }
+ case None =>
+ }
- try {
- libraryCacheManager.unregisterJob(jobID)
- } catch {
- case t: Throwable =>
- log.error(s"Could not properly unregister job $jobID form the library cache.", t)
+ try {
+ libraryCacheManager.unregisterJob(jobID)
+ } catch {
+ case t: Throwable =>
+ log.error(s"Could not properly unregister job $jobID form the library cache.", t)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index d3fc8b1..bef52e0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -80,10 +80,13 @@ class MemoryArchivist(private val max_entries: Int)
override def handleMessage: Receive = {
/* Receive Execution Graph to archive */
- case ArchiveExecutionGraph(jobID, graph) =>
- // wrap graph inside a soft reference
- graphs.update(jobID, graph)
-
+ case ArchiveExecutionGraph(jobID, graph) =>
+ // Keep lru order in case we override a graph (from multiple job submission in one session).
+ // This deletes old ExecutionGraph with this JobID from the history but avoids to store
+ // redundant ExecutionGraphs.
+ // TODO Allow ExecutionGraphs with the same jobID to be stored and displayed in web interface
+ graphs.remove(jobID)
+ graphs.put(jobID, graph)
// update job counters
graph.getState match {
case JobStatus.FINISHED => finishedCnt += 1
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index d7bbb8d..3869392 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -275,6 +275,12 @@ object JobManagerMessages {
case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse
/**
+ * Removes the job belonging to the job identifier from the job manager and archives it.
+ * @param jobID The job identifier
+ */
+ case class RemoveCachedJob(jobID: JobID)
+
+ /**
* Requests the instances of all registered task managers.
*/
case object RequestRegisteredTaskManagers
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bbd011a..839193b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -416,8 +416,8 @@ abstract class FlinkMiniCluster(
jobManagerGateway,
jobGraph,
timeout,
- printUpdates,
- this.getClass().getClassLoader())
+ printUpdates,
+ this.getClass.getClassLoader())
} finally {
if(!useSingleActorSystem) {
// we have to shutdown the just created actor system
@@ -440,7 +440,10 @@ abstract class FlinkMiniCluster(
)
}
- JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, getClass().getClassLoader())
+ JobClient.submitJobDetached(jobManagerGateway,
+ jobGraph,
+ timeout,
+ this.getClass.getClassLoader())
new JobSubmissionResult(jobGraph.getJobID)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index fdbffaa..5753cde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -107,7 +107,8 @@ public class PartialConsumePipelinedResultTest {
flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
jobGraph,
TestingUtils.TESTING_DURATION(),
- false, this.getClass().getClassLoader());
+ false,
+ this.getClass().getClassLoader());
}
// ---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 5594bfe..075c1c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -52,7 +52,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandP
import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
/**
- * Tests that the JobManager process properly exits when the JobManager actor dies.
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
*/
public class TaskManagerProcessReapingTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 74b7680..3a252f8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmanager
+
import Tasks._
import akka.actor.ActorSystem
import akka.actor.Status.{Success, Failure}
@@ -27,13 +28,13 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph, ScheduleMode}
import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
+import org.apache.flink.runtime.testingUtils.{TestingUtils, ScalaTestingUtils}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
+
import org.junit.runner.RunWith
+import org.scalatest.{Matchers, BeforeAndAfterAll, WordSpecLike}
import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -617,6 +618,121 @@ class JobManagerITCase(_system: ActorSystem)
cluster.stop()
}
}
+
+ "remove execution graphs when the client ends the session explicitly" in {
+ val vertex = new JobVertex("Test Vertex")
+ vertex.setInvokableClass(classOf[NoOpInvokable])
+
+ val jobGraph1 = new JobGraph("Test Job", vertex)
+
+ val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 2000)
+ slowVertex.setInvokableClass(classOf[NoOpInvokable])
+
+ val jobGraph2 = new JobGraph("Long running Job", slowVertex)
+
+ val cluster = TestingUtils.startTestingCluster(1)
+ val jm = cluster.getLeaderGateway(1 seconds)
+
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ /* jobgraph1 is removed after being terminated */
+ jobGraph1.setSessionTimeout(9999)
+ jm.tell(SubmitJob(jobGraph1, ListeningBehaviour.EXECUTION_RESULT), self)
+ expectMsg(JobSubmitSuccess(jobGraph1.getJobID))
+ expectMsgType[JobResultSuccess]
+
+ // should not be archived yet
+ jm.tell(RequestExecutionGraph(jobGraph1.getJobID), self)
+ var cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+ assert(!cachedGraph.isArchived)
+
+ jm.tell(RemoveCachedJob(jobGraph1.getJobID), self)
+
+ jm.tell(RequestExecutionGraph(jobGraph1.getJobID), self)
+ cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+ assert(cachedGraph.isArchived)
+
+ /* jobgraph2 is removed while running */
+ jobGraph2.setSessionTimeout(9999)
+ jm.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
+ expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
+
+ // job stil running
+ jm.tell(RemoveCachedJob(jobGraph2.getJobID), self)
+
+ expectMsgType[JobResultSuccess]
+
+ // should be archived!
+ jm.tell(RequestExecutionGraph(jobGraph2.getJobID), self)
+ cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+ assert(cachedGraph.isArchived)
+ }
+ } finally {
+ cluster.stop()
+ }
+ }
+
+ "remove execution graphs when when the client's session times out" in {
+ val vertex = new JobVertex("Test Vertex")
+ vertex.setParallelism(1)
+ vertex.setInvokableClass(classOf[NoOpInvokable])
+
+ val jobGraph = new JobGraph("Test Job", vertex)
+
+ val cluster = TestingUtils.startTestingCluster(1)
+ val jm = cluster.getLeaderGateway(1 seconds)
+
+ try {
+ within(TestingUtils.TESTING_DURATION) {
+ // try multiple times in case of flaky environments
+ var testSucceeded = false
+ var numTries = 0
+ while(!testSucceeded && numTries < 10) {
+ try {
+ // should be removed immediately
+ jobGraph.setSessionTimeout(0)
+ jm.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultSuccess]
+
+ jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+ val cachedGraph2 = expectMsgType[ExecutionGraphFound].executionGraph
+ assert(cachedGraph2.isArchived)
+
+ // removed after 2 seconds
+ jobGraph.setSessionTimeout(2)
+
+ jm.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+ expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+ expectMsgType[JobResultSuccess]
+
+ // should not be archived yet
+ jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+ val cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+ assert(!cachedGraph.isArchived)
+
+ // wait until graph is archived
+ Thread.sleep(3000)
+
+ jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+ val graph = expectMsgType[ExecutionGraphFound].executionGraph
+ assert(graph.isArchived)
+
+ testSucceeded = true
+ } catch {
+ case e: Throwable =>
+ numTries += 1
+ }
+ }
+ if(!testSucceeded) {
+ fail("Test case failed after " + numTries + " probes.")
+ }
+ }
+ } finally {
+ cluster.stop()
+ }
+ }
+
}
class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends JobVertex(name){
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 5265134..745b0d3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -22,7 +22,7 @@ import java.util.UUID
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
+import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
import org.apache.flink.api.java.io._
import org.apache.flink.api.java.operators.DataSource
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -131,7 +131,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Gets the UUID by which this environment is identified. The UUID sets the execution context
* in the cluster or local environment.
*/
- def getId: UUID = {
+ def getId: JobID = {
javaEnv.getId
}
@@ -148,6 +148,33 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
}
/**
+ * Starts a new session, discarding all intermediate results.
+ */
+ def startNewSession() {
+ javaEnv.startNewSession()
+ }
+
+ /**
+ * Sets the session timeout to hold the intermediate results of a job. This only
+ * applies the updated timeout in future executions.
+ * @param timeout The timeout in seconds.
+ */
+ def setSessionTimeout(timeout: Long) {
+ javaEnv.setSessionTimeout(timeout)
+ }
+
+ /**
+ * Gets the session timeout for this environment. The session timeout defines for how long
+ * after an execution, the job and its intermediate results will be kept for future
+ * interactions.
+ *
+ * @return The session timeout, in seconds.
+ */
+ def getSessionTimeout: Long = {
+ javaEnv.getSessionTimeout
+ }
+
+ /**
* Registers the given type with the serializer at the [[KryoSerializer]].
*
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index f691806..e2d91af 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,12 +19,21 @@
package org.apache.flink.api.avro;
import java.io.File;
+import java.net.InetAddress;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
import org.junit.Assert;
import org.junit.Test;
@@ -32,35 +41,34 @@ import org.junit.Test;
public class AvroExternalJarProgramITCase {
private static final String JAR_FILE = "target/maven-test-jar.jar";
-
+
private static final String TEST_DATA_FILE = "/testdata.avro";
@Test
public void testExternalProgram() {
-
+
ForkableFlinkMiniCluster testMiniCluster = null;
-
+
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
testMiniCluster = new ForkableFlinkMiniCluster(config, false);
testMiniCluster.start();
-
+
String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
-
+
PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
-
- Client c = new Client(
- config,
- program.getUserCodeClassLoader(),
- -1);
-
- c.setPrintStatusDuringExecution(false);
- c.run(program, 4, true);
+
+ Client client = new Client(config);
+
+ client.setPrintStatusDuringExecution(false);
+ client.runBlocking(program, 4);
+
}
catch (Throwable t) {
System.err.println(t.getMessage());
@@ -71,7 +79,9 @@ public class AvroExternalJarProgramITCase {
if (testMiniCluster != null) {
try {
testMiniCluster.stop();
- } catch (Throwable t) {}
+ } catch (Throwable t) {
+ // ignore
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ae2c047..29439f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -31,10 +30,12 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
+
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
private final String host;
@@ -117,17 +118,17 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+ Client client;
try {
- Client client = new Client(configuration, usercodeClassLoader, -1);
+ client = new Client(configuration);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-
- JobSubmissionResult result = client.run(jobGraph, true);
- if (result instanceof JobExecutionResult) {
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
- }
+ }
+ catch (Exception e) {
+ throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
+ }
+
+ try {
+ return client.runBlocking(jobGraph, usercodeClassLoader);
}
catch (ProgramInvocationException e) {
throw e;
@@ -136,6 +137,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term, e);
}
+ finally {
+ client.shutdown();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index c2335d6..2b2a426 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -23,10 +23,12 @@ import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,17 +36,25 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
- protected List<File> jars;
- protected Client client;
+ private final List<File> jars;
+
+ private final Client client;
+
+ private final ClassLoader userCodeClassLoader;
+
private final boolean wait;
protected StreamContextEnvironment(Client client, List<File> jars, int parallelism, boolean wait) {
this.client = client;
this.jars = jars;
this.wait = wait;
+
+ this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, getClass().getClassLoader());
+
if (parallelism > 0) {
setParallelism(parallelism);
- } else {
+ }
+ else {
// first check for old parallelism config key
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
@@ -73,16 +83,18 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
transformations.clear();
+ // attach all necessary jar files to the JobGraph
for (File file : jars) {
jobGraph.addJar(new Path(file.getAbsolutePath()));
}
- JobSubmissionResult result = client.run(jobGraph, wait);
- if(result instanceof JobExecutionResult) {
- return (JobExecutionResult) result;
+ // execute the programs
+ if (wait) {
+ return client.runBlocking(jobGraph, userCodeClassLoader);
} else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
+ JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
+ LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+ return JobExecutionResult.fromJobSubmissionResult(result);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ff785c4..c50f23e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
@@ -40,9 +41,9 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FileStateHandle;
@@ -1282,11 +1283,11 @@ public abstract class StreamExecutionEnvironment {
*
* @param jobName
* Desired name of the job
- * @return The result of the job execution, containing elapsed time and
- * accumulators.
+ * @return The result of the job execution: Either JobSubmissionResult or JobExecutionResult;
+ * The latter contains elapsed time and accumulators.
* @throws Exception
*/
- public abstract JobExecutionResult execute(String jobName) throws Exception;
+ public abstract JobSubmissionResult execute(String jobName) throws Exception;
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 8c1408e..e5ea2c5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -19,9 +19,8 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -68,6 +67,6 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
}
- throw new Client.ProgramAbortException();
+ throw new OptimizerPlanEnvironment.ProgramAbortException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
index 6d1b1c7..4c091e5 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
@@ -68,4 +68,9 @@ public class LocalTezEnvironment extends ExecutionEnvironment {
};
initializeContextEnvironment(factory);
}
+
+ @Override
+ public void startNewSession() throws Exception {
+ throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
index b155527..a02b536 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -75,4 +75,9 @@ public class RemoteTezEnvironment extends ExecutionEnvironment {
compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
}
+
+ @Override
+ public void startNewSession() throws Exception {
+ throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
index a54724f..60449db 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.tez.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.configuration.Configuration;
@@ -115,6 +116,26 @@ public class TezExecutor extends PlanExecutor {
}
@Override
+ public void start() throws Exception {
+ throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+ }
+
+ @Override
+ public void endSession(JobID jobID) throws Exception {
+ throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
return executePlanWithConf(tezConf, plan);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index e8b7e86..566573e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -55,6 +55,10 @@ public class TestEnvironment extends ExecutionEnvironment {
}
@Override
+ public void startNewSession() throws Exception {
+ }
+
+ @Override
public JobExecutionResult execute(String jobName) throws Exception {
try {
OptimizedPlan op = compileProgram(jobName);
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 3991ac0..3e2657c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -55,7 +55,8 @@ public class LocalExecutorITCase {
executor.setTaskManagerNumSlots(parallelism);
executor.setPrintStatusDuringExecution(false);
executor.start();
- Plan wcPlan = wc.getPlan(Integer.valueOf(parallelism).toString(), inFile.toURI().toString(),outFile.toURI().toString());
+ Plan wcPlan = wc.getPlan(Integer.valueOf(parallelism).toString(),
+ inFile.toURI().toString(), outFile.toURI().toString());
wcPlan.setExecutionConfig(new ExecutionConfig());
executor.executePlan(wcPlan);
executor.stop();
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 68b099d..b1768f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -102,7 +102,7 @@ public class RemoteEnvironmentITCase {
try {
env.execute();
Assert.fail("Program should not run successfully, cause of invalid akka settings.");
- } catch (ProgramInvocationException ex) {
+ } catch (IOException ex) {
throw ex.getCause();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 082532e..24d9416 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.optimizer.jsonplan;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.Client.ProgramAbortException;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.examples.java.clustering.KMeans;
@@ -66,7 +66,7 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
try {
// <points path> <centers path> <result path> <num iterations
KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
- } catch(ProgramAbortException pae) {
+ } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
// all good.
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index 121bc88..a862242 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -311,6 +311,10 @@ public class JsonJobGraphGenerationTest {
}
@Override
+ public void startNewSession() throws Exception {
+ }
+
+ @Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index b0de0e8..6aea26c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -18,7 +18,6 @@
package org.apache.flink.api.scala.runtime.jobmanager
-import akka.actor.Status.Success
import akka.actor.{ActorSystem, PoisonPill}
import akka.testkit.{ImplicitSender, TestKit}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index fb8e5a1..3337f1a 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -507,7 +507,7 @@ public abstract class YarnTestBase extends TestLogger {
"expected string did not show up", expectedStringSeen);
// check for 0 return code
- Assert.assertTrue("Expecting return value == "+returnCode, runner.getReturnValue() == returnCode);
+ Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue());
LOG.info("Test was successful");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 57a5010..bd1698a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -25,6 +25,7 @@ import static akka.pattern.Patterns.ask;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.net.NetUtils;
@@ -231,6 +232,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
*/
@Override
public void stopAfterJob(JobID jobID) {
+ Preconditions.checkNotNull("The job id must not be null", jobID);
Future<Object> messageReceived = ask(applicationClient, new Messages.StopAMAfterJob(jobID), akkaTimeout);
try {
Await.result(messageReceived, akkaDuration);
[4/4] flink git commit: [FLINK-2097][core] implement a job session
management
Posted by mx...@apache.org.
[FLINK-2097][core] implement a job session management
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.
This pull request implements a rudimentary session management. Together
with the backtracking #640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.
ExecutionGraphs are kept as long as
- no timeout occurred or
- the session has not been explicitly ended
The following changes have also been made in this pull request:
- The Job ID is created through the ExecutionEnvironment and passed through
- Sessions can be termined by the ExecutionEnvironment or directly
through the executor
- The environments use reapers (local) and shutdown hooks (remote) to
ensure session termination when the environment runs out of scope
- The Client manages only connections to the JobManager, it is not job
specific
This closes #858.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71bf2f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71bf2f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71bf2f57
Branch: refs/heads/master
Commit: 71bf2f570861daae53b24bfcf1d06aedb85311b9
Parents: 7984acc
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 4 17:34:44 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 182 +++---
.../org/apache/flink/client/LocalExecutor.java | 222 ++++---
.../org/apache/flink/client/RemoteExecutor.java | 188 ++++--
.../org/apache/flink/client/program/Client.java | 653 +++++++++----------
.../client/program/ContextEnvironment.java | 40 +-
.../flink/client/program/JobWithJars.java | 5 +-
.../program/OptimizerPlanEnvironment.java | 132 ++++
.../flink/client/program/PackagedProgram.java | 51 +-
.../client/program/PreviewPlanEnvironment.java | 80 +++
.../flink/client/web/JobSubmissionServlet.java | 30 +-
.../flink/client/CliFrontendInfoTest.java | 81 +--
.../client/CliFrontendPackageProgramTest.java | 10 +-
.../apache/flink/client/CliFrontendRunTest.java | 30 +-
.../RemoteExecutorHostnameResolutionTest.java | 6 +-
.../client/program/ClientConnectionTest.java | 8 +-
.../apache/flink/client/program/ClientTest.java | 142 ++--
.../ExecutionPlanAfterExecutionTest.java | 15 +-
.../program/ExecutionPlanCreationTest.java | 9 +-
.../client/program/PackagedProgramTest.java | 1 -
.../stormcompatibility/api/FlinkClient.java | 22 +-
.../flink/api/common/JobExecutionResult.java | 17 +-
.../java/org/apache/flink/api/common/JobID.java | 46 +-
.../flink/api/common/JobSubmissionResult.java | 5 +-
.../java/org/apache/flink/api/common/Plan.java | 71 +-
.../apache/flink/api/common/PlanExecutor.java | 85 ++-
.../flink/api/java/CollectionEnvironment.java | 4 +
.../flink/api/java/ExecutionEnvironment.java | 97 ++-
.../apache/flink/api/java/LocalEnvironment.java | 180 ++++-
.../flink/api/java/RemoteEnvironment.java | 153 ++++-
.../flink/optimizer/plan/OptimizedPlan.java | 15 +-
.../plantranslate/JobGraphGenerator.java | 24 +-
.../optimizer/postpass/JavaApiPostPass.java | 2 +-
.../apache/flink/runtime/client/JobClient.java | 11 +-
.../runtime/client/JobExecutionException.java | 2 +-
.../runtime/executiongraph/ExecutionGraph.java | 8 +
.../apache/flink/runtime/jobgraph/JobGraph.java | 58 +-
.../runtime/taskmanager/TaskExecutionState.java | 2 +-
.../flink/runtime/jobmanager/JobInfo.scala | 18 +-
.../flink/runtime/jobmanager/JobManager.scala | 98 ++-
.../runtime/jobmanager/MemoryArchivist.scala | 11 +-
.../runtime/messages/JobManagerMessages.scala | 6 +
.../runtime/minicluster/FlinkMiniCluster.scala | 9 +-
.../PartialConsumePipelinedResultTest.java | 3 +-
.../TaskManagerProcessReapingTest.java | 2 +-
.../runtime/jobmanager/JobManagerITCase.scala | 126 +++-
.../flink/api/scala/ExecutionEnvironment.scala | 31 +-
.../api/avro/AvroExternalJarProgramITCase.java | 38 +-
.../environment/RemoteStreamEnvironment.java | 24 +-
.../environment/StreamContextEnvironment.java | 28 +-
.../environment/StreamExecutionEnvironment.java | 11 +-
.../api/environment/StreamPlanEnvironment.java | 7 +-
.../flink/tez/client/LocalTezEnvironment.java | 5 +
.../flink/tez/client/RemoteTezEnvironment.java | 5 +
.../apache/flink/tez/client/TezExecutor.java | 21 +
.../apache/flink/test/util/TestEnvironment.java | 4 +
.../clients/examples/LocalExecutorITCase.java | 3 +-
.../RemoteEnvironmentITCase.java | 2 +-
.../jsonplan/DumpCompiledPlanTest.java | 6 +-
.../jsonplan/JsonJobGraphGenerationTest.java | 4 +
.../jobmanager/JobManagerFailsITCase.scala | 1 -
.../org/apache/flink/yarn/YarnTestBase.java | 2 +-
.../org/apache/flink/yarn/FlinkYarnCluster.java | 2 +
62 files changed, 2114 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index fc4d98a..f0e6c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -57,10 +57,13 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -297,44 +300,51 @@ public class CliFrontend {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
- Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+ Client client = getClient(options, program.getMainClassName(), userParallelism);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
- if(client.getMaxSlots() != -1 && userParallelism == -1) {
- logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
- "To use another parallelism, set it at the ./bin/flink client.");
- userParallelism = client.getMaxSlots();
- }
- // check if detached per job yarn cluster is used to start flink
- if(yarnCluster != null && yarnCluster.isDetached()) {
- logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
- "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
- "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
- exitCode = executeProgram(program, client, userParallelism, false);
- } else {
- // regular (blocking) execution.
- exitCode = executeProgram(program, client, userParallelism, true);
- }
+ try {
+ if (client.getMaxSlots() != -1 && userParallelism == -1) {
+ logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
+ "To use another parallelism, set it at the ./bin/flink client.");
+ userParallelism = client.getMaxSlots();
+ }
- // show YARN cluster status if its not a detached YARN cluster.
- if (yarnCluster != null && !yarnCluster.isDetached()) {
- List<String> msgs = yarnCluster.getNewMessages();
- if (msgs != null && msgs.size() > 1) {
+ // check if detached per job yarn cluster is used to start flink
+ if (yarnCluster != null && yarnCluster.isDetached()) {
+ logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+ "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+ "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+ "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+ exitCode = executeProgramDetached(program, client, userParallelism);
+ }
+ else {
+ // regular (blocking) execution.
+ exitCode = executeProgramBlocking(program, client, userParallelism);
+ }
+
+ // show YARN cluster status if its not a detached YARN cluster.
+ if (yarnCluster != null && !yarnCluster.isDetached()) {
+ List<String> msgs = yarnCluster.getNewMessages();
+ if (msgs != null && msgs.size() > 1) {
- logAndSysout("The following messages were created by the YARN cluster while running the Job:");
- for (String msg : msgs) {
- logAndSysout(msg);
+ logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+ for (String msg : msgs) {
+ logAndSysout(msg);
+ }
+ }
+ if (yarnCluster.hasFailed()) {
+ logAndSysout("YARN cluster is in failed state!");
+ logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}
- if (yarnCluster.hasFailed()) {
- logAndSysout("YARN cluster is in failed state!");
- logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
- }
- }
- return exitCode;
+ return exitCode;
+ }
+ finally {
+ client.shutdown();
+ }
}
catch (Throwable t) {
return handleError(t);
@@ -395,8 +405,10 @@ public class CliFrontend {
int parallelism = options.getParallelism();
LOG.info("Creating program plan dump");
- Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
- FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism);
+
+ Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+
+ FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
if (webFrontend) {
this.optimizedPlan = flinkPlan;
@@ -425,6 +437,8 @@ public class CliFrontend {
}
}
return 0;
+
+
}
catch (Throwable t) {
return handleError(t);
@@ -623,52 +637,65 @@ public class CliFrontend {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------
- protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
- LOG.info("Starting execution of program");
- JobSubmissionResult execResult;
+ protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+ JobSubmissionResult result;
try {
- execResult = client.run(program, parallelism, wait);
- }
- catch (ProgramInvocationException e) {
+ result = client.runDetached(program, parallelism);
+ } catch (ProgramInvocationException e) {
return handleError(e);
- }
- finally {
+ } finally {
program.deleteExtractedLibraries();
}
- if(wait) {
- LOG.info("Program execution finished");
- }
-
- // we come here after the job has finished (or the job has been submitted)
- if (execResult != null) {
+ if (result != null) {
// if the job has been submitted to a detached YARN cluster, there won't be any
// exec results, but the object will be set (for the job id)
if (yarnCluster != null && yarnCluster.isDetached()) {
- if(execResult.getJobID() == null) {
- throw new RuntimeException("Error while starting job. No Job ID set.");
- }
- yarnCluster.stopAfterJob(execResult.getJobID());
+
+ yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect();
- if(!webFrontend) {
- System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
+ if (!webFrontend) {
+ System.out.println("The Job has been submitted with JobID " + result.getJobID());
}
return 0;
- }
- if (execResult instanceof JobExecutionResult) {
- JobExecutionResult result = (JobExecutionResult) execResult;
- if(!webFrontend) {
- System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
- }
- Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
- if (accumulatorsResult.size() > 0 && !webFrontend) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
- }
} else {
- LOG.info("The Job did not return an execution result");
+ throw new RuntimeException("Error while starting job. No Job ID set.");
+ }
+ }
+
+ return 0;
+ }
+
+ protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
+ LOG.info("Starting execution of program");
+
+ JobExecutionResult result;
+ try {
+ client.setPrintStatusDuringExecution(true);
+ result = client.runBlocking(program, parallelism);
+ }
+ catch (ProgramInvocationException e) {
+ return handleError(e);
+ }
+ finally {
+ program.deleteExtractedLibraries();
+ }
+
+ LOG.info("Program execution finished");
+
+ if (result != null) {
+ if (!webFrontend) {
+ System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
+ }
+ Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+ if (accumulatorsResult.size() > 0 && !webFrontend) {
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
+ } else {
+ LOG.info("The Job did not return an execution result");
}
+
return 0;
}
@@ -767,7 +794,6 @@ public class CliFrontend {
* Retrieves a {@link Client} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
- * @param classLoader Class loader to use by the Client
* @param programName Program name
* @param userParallelism Given user parallelism
* @return
@@ -775,12 +801,10 @@ public class CliFrontend {
*/
protected Client getClient(
CommandLineOptions options,
- ClassLoader classLoader,
String programName,
int userParallelism)
throws Exception {
- InetSocketAddress jobManagerAddress = null;
-
+ InetSocketAddress jobManagerAddress;
int maxSlots = -1;
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
@@ -796,14 +820,16 @@ public class CliFrontend {
// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
- if(yarnTmSlots == -1) {
+ if (yarnTmSlots == -1) {
yarnTmSlots = 1;
}
maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
- if(userParallelism != -1) {
+ if (userParallelism != -1) {
int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
- logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
- "Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
+ logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
+ "but the user requested a parallelism of " + userParallelism + " on YARN. " +
+ "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
+ "will get "+slotsPerTM+" slots.");
flinkYarnClient.setTaskManagerSlots(slotsPerTM);
}
@@ -811,11 +837,12 @@ public class CliFrontend {
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
}
- catch(Exception e) {
+ catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}
jobManagerAddress = yarnCluster.getJobManagerAddress();
+ writeJobManagerAddressToConfig(jobManagerAddress);
logAndSysout("YARN cluster started");
logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
@@ -847,14 +874,11 @@ public class CliFrontend {
else {
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
+ writeJobManagerAddressToConfig(jobManagerAddress);
}
}
- if(jobManagerAddress != null) {
- writeJobManagerAddressToConfig(jobManagerAddress);
- }
-
- return new Client(config, classLoader, maxSlots);
+ return new Client(config, maxSlots);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index cf08e0a..7928e53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -22,11 +22,14 @@ import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.DataStatistics;
@@ -35,52 +38,66 @@ import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
/**
- * A class for executing a {@link Plan} on a local embedded Flink runtime instance.
+ * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
+ *
+ * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method,
+ * this executor still start up and shut down again immediately after the program finished.</p>
+ *
+ * <p>To use this executor to execute many dataflow programs that constitute one job together,
+ * then this executor needs to be explicitly started, to keep running across several executions.</p>
*/
public class LocalExecutor extends PlanExecutor {
- private static boolean DEFAULT_OVERWRITE = false;
+ private static final boolean DEFAULT_OVERWRITE = false;
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
- private final Object lock = new Object(); // we lock to ensure singleton execution
-
+ /** we lock to ensure singleton execution */
+ private final Object lock = new Object();
+
+ /** The mini cluster on which to execute the local programs */
private LocalFlinkMiniCluster flink;
+ /** Custom user configuration for the execution */
private Configuration configuration;
- // ---------------------------------- config options ------------------------------------------
-
+ /** Config value for how many slots to provide in the local cluster */
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ /** Config flag whether to overwrite existing files by default */
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
-
- // --------------------------------------------------------------------------------------------
-
+
+ // ------------------------------------------------------------------------
+
public LocalExecutor() {
- if (!ExecutionEnvironment.localExecutionIsAllowed()) {
- throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
- }
+ this(null);
}
public LocalExecutor(Configuration conf) {
- this();
- this.configuration = conf;
+ if (!ExecutionEnvironment.localExecutionIsAllowed()) {
+ throw new InvalidProgramException(
+ "The LocalEnvironment cannot be used when submitting a program through a client.");
+ }
+
+ this.configuration = conf != null ? conf : new Configuration();
}
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
-
public boolean isDefaultOverwriteFiles() {
return defaultOverwriteFiles;
}
-
+
public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
this.defaultOverwriteFiles = defaultOverwriteFiles;
}
-
+
public void setTaskManagerNumSlots(int taskManagerNumSlots) {
this.taskManagerNumSlots = taskManagerNumSlots;
}
@@ -88,51 +105,48 @@ public class LocalExecutor extends PlanExecutor {
public int getTaskManagerNumSlots() {
return this.taskManagerNumSlots;
}
-
- // --------------------------------------------------------------------------------------------
- public static Configuration createConfiguration(LocalExecutor le) {
- Configuration configuration = new Configuration();
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
- configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
- return configuration;
- }
+ // --------------------------------------------------------------------------------------------
+ @Override
public void start() throws Exception {
- synchronized (this.lock) {
- if (this.flink == null) {
-
+ synchronized (lock) {
+ if (flink == null) {
// create the embedded runtime
- Configuration configuration = createConfiguration(this);
- if(this.configuration != null) {
+ Configuration configuration = createConfiguration();
+ if (this.configuration != null) {
configuration.addAll(this.configuration);
}
// start it up
- this.flink = new LocalFlinkMiniCluster(configuration, true);
+ flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
} else {
throw new IllegalStateException("The local executor was already started.");
}
}
}
-
- /**
- * Stop the local executor instance. You should not call executePlan after this.
- */
+
+ @Override
public void stop() throws Exception {
- synchronized (this.lock) {
- if (this.flink != null) {
- this.flink.stop();
- this.flink = null;
- } else {
- throw new IllegalStateException("The local executor was not started.");
+ synchronized (lock) {
+ if (flink != null) {
+ flink.stop();
+ flink = null;
}
}
}
+ @Override
+ public boolean isRunning() {
+ return flink != null;
+ }
+
/**
- * Execute the given plan on the local Nephele instance, wait for the job to
- * finish and return the runtime in milliseconds.
+ * Executes the given program on a local runtime and waits for the job to finish.
+ *
+ * <p>If the executor has not been started before, this starts the executor and shuts it down
+ * after the job finished. If the job runs in session mode, the executor is kept alive until
+ * no more references to the executor exist.</p>
*
* @param plan The plan of the program to execute.
* @return The net runtime of the program, in milliseconds.
@@ -145,15 +159,15 @@ public class LocalExecutor extends PlanExecutor {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
-
+
synchronized (this.lock) {
-
+
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
- if (this.flink == null) {
- // we start a session just for us now
+
+ if (flink == null) {
shutDownAtEnd = true;
-
+
// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
@@ -161,9 +175,11 @@ public class LocalExecutor extends PlanExecutor {
this.taskManagerNumSlots = maxParallelism;
}
}
-
+
+ // start the cluster for us
start();
- } else {
+ }
+ else {
// we use the existing session
shutDownAtEnd = false;
}
@@ -173,10 +189,10 @@ public class LocalExecutor extends PlanExecutor {
Optimizer pc = new Optimizer(new DataStatistics(), configuration);
OptimizedPlan op = pc.compile(plan);
-
+
JobGraphGenerator jgg = new JobGraphGenerator(configuration);
- JobGraph jobGraph = jgg.compileJobGraph(op);
-
+ JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
+
boolean sysoutPrint = isPrintingStatusDuringExecution();
return flink.submitJobAndWait(jobGraph, sysoutPrint);
}
@@ -189,32 +205,50 @@ public class LocalExecutor extends PlanExecutor {
}
/**
- * Returns a JSON dump of the optimized plan.
- *
- * @param plan
- * The program's plan.
- * @return JSON dump of the optimized plan.
- * @throws Exception
+ * Creates a JSON representation of the given dataflow's execution plan.
+ *
+ * @param plan The dataflow plan.
+ * @return The dataflow's execution plan, as a JSON string.
+ * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
- Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this));
+ final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+ Optimizer pc = new Optimizer(new DataStatistics(), this.configuration);
+ pc.setDefaultParallelism(parallelism);
OptimizedPlan op = pc.compile(plan);
- PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
- return gen.getOptimizerPlanAsJSON(op);
+
+ return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}
-
+
+ @Override
+ public void endSession(JobID jobID) throws Exception {
+ LocalFlinkMiniCluster flink = this.flink;
+ if (flink != null) {
+ ActorGateway leaderGateway = flink.getLeaderGateway(AkkaUtils.getDefaultTimeout());
+ leaderGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID));
+ }
+ }
+
+ private Configuration createConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+ configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
+ return configuration;
+ }
+
+
// --------------------------------------------------------------------------------------------
// Static variants that internally bring up an instance and shut it down after the execution
// --------------------------------------------------------------------------------------------
-
+
/**
- * Executes the program described by the given plan assembler.
+ * Executes the given program.
*
- * @param pa The program's plan assembler.
+ * @param pa The program.
* @param args The parameters.
- * @return The net runtime of the program, in milliseconds.
+ * @return The execution result of the program.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
@@ -222,57 +256,45 @@ public class LocalExecutor extends PlanExecutor {
public static JobExecutionResult execute(Program pa, String... args) throws Exception {
return execute(pa.getPlan(args));
}
-
+
/**
- * Executes the program represented by the given Pact plan.
+ * Executes the given dataflow plan.
*
- * @param plan The program's plan.
- * @return The net runtime of the program, in milliseconds.
+ * @param plan The dataflow plan.
+ * @return The execution result.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
public static JobExecutionResult execute(Plan plan) throws Exception {
- LocalExecutor exec = new LocalExecutor();
- try {
- exec.start();
- return exec.executePlan(plan);
- } finally {
- exec.stop();
- }
+ return new LocalExecutor().executePlan(plan);
}
/**
- * Returns a JSON dump of the optimized plan.
+ * Creates a JSON representation of the given dataflow's execution plan.
*
- * @param plan
- * The program's plan.
- * @return JSON dump of the optimized plan.
- * @throws Exception
+ * @param plan The dataflow plan.
+ * @return The dataflow's execution plan, as a JSON string.
+ * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
public static String optimizerPlanAsJSON(Plan plan) throws Exception {
- LocalExecutor exec = new LocalExecutor();
- try {
- exec.start();
- Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration());
- OptimizedPlan op = pc.compile(plan);
- PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
- return gen.getOptimizerPlanAsJSON(op);
- } finally {
- exec.stop();
- }
+ final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+ Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+ pc.setDefaultParallelism(parallelism);
+ OptimizedPlan op = pc.compile(plan);
+
+ return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}
/**
- * Return unoptimized plan as JSON.
+ * Creates a JSON representation of the given dataflow plan.
*
- * @param plan The program plan.
- * @return The plan as a JSON object.
+ * @param plan The dataflow plan.
+ * @return The dataflow plan (prior to optimization) as a JSON string.
*/
public static String getPlanAsJSON(Plan plan) {
- PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
- return gen.getPactPlanAsJSON(sinks);
+ return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 20169f6..e8e9ade 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.client;
-import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -26,36 +25,41 @@ import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
* and ships it to a remote Flink cluster for execution.
*
- * The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
- * set of libraries that need to be shipped together with the program.
+ * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
+ * set of libraries that need to be shipped together with the program.</p>
*
- * The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
- * remotely execute program parts.
+ * <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
+ * remotely execute program parts.</p>
*/
public class RemoteExecutor extends PlanExecutor {
+
+ private final Object lock = new Object();
- private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
-
private final List<String> jarFiles;
private final Configuration clientConfiguration;
+
+ private Client client;
+
+ private int defaultParallelism = 1;
+
public RemoteExecutor(String hostname, int port) {
this(hostname, port, Collections.<String>emptyList(), new Configuration());
@@ -97,51 +101,148 @@ public class RemoteExecutor extends PlanExecutor {
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
}
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets the parallelism that will be used when neither the program does not define
+ * any parallelism at all.
+ *
+ * @param defaultParallelism The default parallelism for the executor.
+ */
+ public void setDefaultParallelism(int defaultParallelism) {
+ if (defaultParallelism < 1) {
+ throw new IllegalArgumentException("The default parallelism must be at least one");
+ }
+ this.defaultParallelism = defaultParallelism;
+ }
+
+ /**
+ * Gets the parallelism that will be used when neither the program does not define
+ * any parallelism at all.
+ *
+ * @return The default parallelism for the executor.
+ */
+ public int getDefaultParallelism() {
+ return defaultParallelism;
+ }
+
+ // ------------------------------------------------------------------------
+ // Startup & Shutdown
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public void start() throws Exception {
+ synchronized (lock) {
+ if (client == null) {
+ client = new Client(clientConfiguration);
+ client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+ }
+ else {
+ throw new IllegalStateException("The remote executor was already started.");
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ synchronized (lock) {
+ if (client != null) {
+ client.shutdown();
+ client = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return client != null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Executing programs
+ // ------------------------------------------------------------------------
+
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
+ if (plan == null) {
+ throw new IllegalArgumentException("The plan may not be null.");
+ }
+
JobWithJars p = new JobWithJars(plan, this.jarFiles);
return executePlanWithJars(p);
}
-
- public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
- Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
- c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-
- JobSubmissionResult result = c.run(p, -1, true);
- if (result instanceof JobExecutionResult) {
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
+
+ public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
+ if (program == null) {
+ throw new IllegalArgumentException("The job may not be null.");
}
- }
- public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
- File jarFile = new File(jarPath);
- PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
-
- Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
- c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-
- JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
- if(result instanceof JobExecutionResult) {
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
+ synchronized (this.lock) {
+ // check if we start a session dedicated for this execution
+ final boolean shutDownAtEnd;
+
+ if (client == null) {
+ shutDownAtEnd = true;
+ // start the executor for us
+ start();
+ }
+ else {
+ // we use the existing session
+ shutDownAtEnd = false;
+ }
+
+ try {
+ return client.runBlocking(program, defaultParallelism);
+ }
+ finally {
+ if (shutDownAtEnd) {
+ stop();
+ }
+ }
}
}
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
- JobWithJars p = new JobWithJars(plan, this.jarFiles);
- Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
-
- OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
- PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
- return jsonGen.getOptimizerPlanAsJSON(op);
+ Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+ OptimizedPlan optPlan = opt.compile(plan);
+ return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
}
-
+
+ @Override
+ public void endSession(JobID jobID) throws Exception {
+ if (jobID == null) {
+ throw new NullPointerException("The supplied jobID must not be null.");
+ }
+
+ synchronized (this.lock) {
+ // check if we start a session dedicated for this execution
+ final boolean shutDownAtEnd;
+
+ if (client == null) {
+ shutDownAtEnd = true;
+ // start the executor for us
+ start();
+ }
+ else {
+ // we use the existing session
+ shutDownAtEnd = false;
+ }
+
+ try {
+ client.endSession(jobID);
+ }
+ finally {
+ if (shutDownAtEnd) {
+ stop();
+ }
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@@ -168,5 +269,4 @@ public class RemoteExecutor extends PlanExecutor {
}
return new InetSocketAddress(host, port);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e7464c8..6c886fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,10 +18,9 @@
package org.apache.flink.client.program;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.PrintStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -32,8 +31,6 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -65,7 +62,6 @@ import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem;
-import com.google.common.base.Preconditions;
/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -78,62 +74,139 @@ public class Client {
* The configuration to use for the client (optimizer, timeouts, ...) and to connect to the
* JobManager.
*/
- private final Configuration configuration;
-
/** The optimizer used in the optimization of batch programs */
- private final Optimizer compiler;
+ final Optimizer compiler;
+
+ /** The actor system used to communicate with the JobManager */
+ private final ActorSystem actorSystem;
- /** The class loader to use for classes from the user program (e.g., functions and data types) */
- private final ClassLoader userCodeClassLoader;
+ /** The actor reference to the JobManager */
+ private final ActorGateway jobManagerGateway;
+
+ /** The timeout for communication between the client and the JobManager */
+ private final FiniteDuration timeout;
+
+ /**
+ * If != -1, this field specifies the total number of available slots on the cluster
+ * connected to the client.
+ */
+ private final int maxSlots;
/** Flag indicating whether to sysout print execution updates */
private boolean printStatusDuringExecution = true;
/**
- * If != -1, this field specifies the total number of available slots on the cluster
- * connected to the client.
+ * For interactive invocations, the Job ID is only available after the ContextEnvironment has
+ * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
+ * which lets us access the last JobID here.
*/
- private int maxSlots;
+ private JobID lastJobID;
- /** ID of the last job submitted with this client. */
- private JobID lastJobId = null;
-
-
// ------------------------------------------------------------------------
// Construction
// ------------------------------------------------------------------------
/**
* Creates a instance that submits the programs to the JobManager defined in the
- * configuration. It sets the maximum number of slots to unknown (= -1).
+ * configuration. This method will try to resolve the JobManager hostname and throw an exception
+ * if that is not possible.
*
- * @param config The config used to obtain the JobManager's address.
- * @param userCodeClassLoader The class loader to use for loading user code classes.
+ * @param config The config used to obtain the job-manager's address, and used to configure the optimizer.
+ *
+ * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+ * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
*/
- public Client(Configuration config, ClassLoader userCodeClassLoader) {
- this(config, userCodeClassLoader, -1);
+ public Client(Configuration config) throws IOException {
+ this(config, -1);
}
/**
- * Creates a instance that submits the programs to the JobManager defined in the
- * configuration.
+ * Creates a new instance of the class that submits the jobs to a job-manager.
+ * at the given address using the default port.
*
- * @param config The config used to obtain the JobManager's address.
- * @param userCodeClassLoader The class loader to use for loading user code classes.
- * @param maxSlots The number of maxSlots on the cluster if != -1
+ * @param config The configuration for the client-side processes, like the optimizer.
+ * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1.
+ *
+ * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+ * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
*/
- public Client(Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
- Preconditions.checkNotNull(config, "Configuration is null");
- Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
-
- this.configuration = config;
- this.userCodeClassLoader = userCodeClassLoader;
+ public Client(Configuration config, int maxSlots) throws IOException {
- this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+ this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
this.maxSlots = maxSlots;
+
+ LOG.info("Starting client actor system");
+
+ try {
+ this.actorSystem = JobClient.startJobClientActorSystem(config);
+ } catch (Exception e) {
+ throw new IOException("Could start client actor system.", e);
+ }
+
+ // from here on, we need to make sure the actor system is shut down on error
+ boolean success = false;
+
+ try {
+
+ FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(config);
+ this.timeout = AkkaUtils.getTimeout(config);
+
+ LOG.info("Looking up JobManager");
+ LeaderRetrievalService leaderRetrievalService;
+
+ try {
+ leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+ } catch (Exception e) {
+ throw new IOException("Could not create the leader retrieval service.", e);
+ }
+
+ try {
+ this.jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+ leaderRetrievalService,
+ actorSystem,
+ lookupTimeout);
+ } catch (LeaderRetrievalException e) {
+ throw new IOException("Failed to retrieve JobManager gateway", e);
+ }
+
+ LOG.info("Leading JobManager actor system address is " + this.jobManagerGateway.path());
+
+ LOG.info("JobManager runs at " + this.jobManagerGateway.path());
+
+ LOG.info("Communication between client and JobManager will have a timeout of " + this.timeout);
+ success = true;
+ } finally {
+ if (!success) {
+ try {
+ this.actorSystem.shutdown();
+
+ // wait at most for 30 seconds, to work around an occasional akka problem
+ actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
+ } catch (Throwable t) {
+ LOG.error("Shutting down actor system after error caused another error", t);
+ }
+ }
+ }
}
+ // ------------------------------------------------------------------------
+ // Startup & Shutdown
+ // ------------------------------------------------------------------------
/**
+ * Shuts down the client. This stops the internal actor system and actors.
+ */
+ public void shutdown() {
+ if (!this.actorSystem.isTerminated()) {
+ this.actorSystem.shutdown();
+ this.actorSystem.awaitTermination();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
+
+ /**
* Configures whether the client should print progress updates during the execution to {@code System.out}.
* All updates are logged via the SLF4J loggers regardless of this setting.
*
@@ -159,118 +232,84 @@ public class Client {
}
// ------------------------------------------------------------------------
- // Compilation and Submission
+ // Access to the Program's Plan
// ------------------------------------------------------------------------
- public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+ public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
- return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism));
+ return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
}
-
- public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+
+ public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
- return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
- }
- else if (prog.isUsingInteractiveMode()) {
+ return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
+ } else if (prog.isUsingInteractiveMode()) {
// temporary hack to support the optimizer plan preview
- OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
+ OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
if (parallelism > 0) {
env.setParallelism(parallelism);
}
- env.setAsContext();
-
- // temporarily write syserr and sysout to a byte array.
- PrintStream originalOut = System.out;
- PrintStream originalErr = System.err;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- System.setOut(new PrintStream(baos));
- ByteArrayOutputStream baes = new ByteArrayOutputStream();
- System.setErr(new PrintStream(baes));
- try {
- ContextEnvironment.enableLocalExecution(false);
- prog.invokeInteractiveModeForExecution();
- }
- catch (ProgramInvocationException e) {
- throw e;
- }
- catch (Throwable t) {
- // the invocation gets aborted with the preview plan
- if (env.optimizerPlan != null) {
- return env.optimizerPlan;
- } else {
- throw new ProgramInvocationException("The program caused an error: ", t);
- }
- }
- finally {
- ContextEnvironment.enableLocalExecution(true);
- System.setOut(originalOut);
- System.setErr(originalErr);
- System.err.println(baes);
- System.out.println(baos);
- }
-
- throw new ProgramInvocationException(
- "The program plan could not be fetched - the program aborted pre-maturely.\n"
- + "System.err: " + baes.toString() + '\n'
- + "System.out: " + baos.toString() + '\n');
- }
- else {
- throw new RuntimeException();
+
+ return env.getOptimizedPlan(prog);
+ } else {
+ throw new RuntimeException("Couldn't determine program mode.");
}
}
-
- public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
+
+ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
- LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism);
+ LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
- return this.compiler.compile(p);
- }
-
-
- /**
- * Creates the optimized plan for a given program, using this client's compiler.
- *
- * @param prog The program to be compiled.
- * @return The compiled and optimized plan, as returned by the compiler.
- * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
- * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
- */
- public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
- return getOptimizedPlan(prog.getPlan(), parallelism);
- }
-
- public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
- return getJobGraph(optPlan, prog.getAllLibraries());
+ return compiler.compile(p);
}
-
- private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
- JobGraph job;
- if (optPlan instanceof StreamingPlan) {
- job = ((StreamingPlan) optPlan).getJobGraph();
- } else {
- JobGraphGenerator gen = new JobGraphGenerator(this.configuration);
- job = gen.compileJobGraph((OptimizedPlan) optPlan);
- }
- for (File jar : jarFiles) {
- job.addJar(new Path(jar.getAbsolutePath()));
+ // ------------------------------------------------------------------------
+ // Program submission / execution
+ // ------------------------------------------------------------------------
+
+ public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+ Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+ if (prog.isUsingProgramEntryPoint()) {
+ return runBlocking(prog.getPlanWithJars(), parallelism);
}
+ else if (prog.isUsingInteractiveMode()) {
+ LOG.info("Starting program in interactive mode");
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+ ContextEnvironment.enableLocalExecution(false);
- return job;
+ // invoke here
+ try {
+ prog.invokeInteractiveModeForExecution();
+ }
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ }
+
+ return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+ }
+ else {
+ throw new RuntimeException();
+ }
}
- public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
+ public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism)
+ throws ProgramInvocationException
+ {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
- return run(prog.getPlanWithJars(), parallelism, wait);
+ return runDetached(prog.getPlanWithJars(), parallelism);
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
ContextEnvironment.enableLocalExecution(false);
// invoke here
@@ -281,113 +320,108 @@ public class Client {
ContextEnvironment.enableLocalExecution(true);
}
- // Job id has been set in the Client passed to the ContextEnvironment
- return new JobSubmissionResult(lastJobId);
+ return new JobSubmissionResult(lastJobID);
}
else {
- throw new RuntimeException();
+ throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
}
}
-
- public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
- return run(optimizedPlan, prog.getAllLibraries(), wait);
- }
-
/**
- * Runs a program on Flink cluster whose job-manager is configured in this client's configuration.
- * This method involves all steps, from compiling, job-graph generation to submission.
- *
- * @param prog The program to be executed.
+ * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
+ * execution is complete, and returns afterwards.
+ *
+ * @param program The program to be executed.
* @param parallelism The default parallelism to use when running the program. The default parallelism is used
* when the program does not set a parallelism by itself.
- * @param wait A flag that indicates whether this function call should block until the program execution is done.
+ *
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
- public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
- return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
+ public JobExecutionResult runBlocking(JobWithJars program, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
+ ClassLoader classLoader = program.getUserCodeClassLoader();
+ if (classLoader == null) {
+ throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+ }
+
+ OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+ return runBlocking(optPlan, program.getJarFiles(), classLoader);
+ }
+
+ /**
+ * Submits a program to the Flink cluster to which this client is connected. The call returns after the
+ * program was submitted and does not wait for the program to complete.
+ *
+ * @param program The program to be executed.
+ * @param parallelism The default parallelism to use when running the program. The default parallelism is used
+ * when the program does not set a parallelism by itself.
+ *
+ * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+ * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
+ * or if the submission failed. That might be either due to an I/O problem,
+ * i.e. the job-manager is unreachable.
+ */
+ public JobSubmissionResult runDetached(JobWithJars program, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
+ ClassLoader classLoader = program.getUserCodeClassLoader();
+ if (classLoader == null) {
+ throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+ }
+
+ OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
+ return runDetached(optimizedPlan, program.getJarFiles(), classLoader);
}
- public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
+ public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+ throws ProgramInvocationException
+ {
JobGraph job = getJobGraph(compiledPlan, libraries);
- this.lastJobId = job.getJobID();
- return run(job, wait);
+ return runBlocking(job, classLoader);
}
- public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
- this.lastJobId = jobGraph.getJobID();
-
- LOG.info("Starting client actor system");
- final ActorSystem actorSystem;
+ public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+ throws ProgramInvocationException
+ {
+ JobGraph job = getJobGraph(compiledPlan, libraries);
+ return runDetached(job, classLoader);
+ }
+
+ public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+ LOG.info("Checking and uploading JAR files");
try {
- actorSystem = JobClient.startJobClientActorSystem(configuration);
+ JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+ } catch (IOException e) {
+ throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
}
- catch (Exception e) {
- throw new ProgramInvocationException("Could start client actor system.", e);
+ try {
+ this.lastJobID = jobGraph.getJobID();
+ return JobClient.submitJobAndWait(actorSystem, jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader);
+ } catch (JobExecutionException e) {
+ throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
+ }
+ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+ LOG.info("Checking and uploading JAR files");
try {
- FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
- FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-
- LOG.info("Looking up JobManager");
- ActorGateway jobManagerGateway;
-
- LeaderRetrievalService leaderRetrievalService;
-
- try {
- leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- } catch (Exception e) {
- throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
- }
-
- try {
- jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
- leaderRetrievalService,
- actorSystem,
- lookupTimeout);
- } catch (LeaderRetrievalException e) {
- throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
- }
-
- LOG.info("Leading JobManager actor system address is " + jobManagerGateway.path());
-
- LOG.info("JobManager runs at " + jobManagerGateway.path());
-
- LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
-
- LOG.info("Checking and uploading JAR files");
- try {
- JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
- } catch (IOException e) {
- throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
- }
-
- try {
- if (wait) {
- return JobClient.submitJobAndWait(actorSystem,
- jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
- } else {
- JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
- // return a dummy execution result with the JobId
- return new JobSubmissionResult(jobGraph.getJobID());
- }
- } catch (JobExecutionException e) {
+ JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+ }
+ catch (IOException e) {
+ throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
+ }
+ try {
+ this.lastJobID = jobGraph.getJobID();
+ JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader);
+ return new JobSubmissionResult(jobGraph.getJobID());
+ } catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
- } catch (Exception e) {
- throw new ProgramInvocationException("Exception during program execution.", e);
- }
- } finally {
- // shut down started actor system
- actorSystem.shutdown();
-
- // wait at most for 30 seconds, to work around an occasional akka problem
- actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
}
}
@@ -397,62 +431,26 @@ public class Client {
* @throws Exception In case an error occurred.
*/
public void cancel(JobID jobId) throws Exception {
- final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
- final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
- ActorSystem actorSystem;
+ Future<Object> response;
try {
- actorSystem = JobClient.startJobClientActorSystem(configuration);
+ response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
} catch (Exception e) {
- throw new ProgramInvocationException("Could start client actor system.", e);
+ throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
}
- try {
- ActorGateway jobManagerGateway;
-
- LeaderRetrievalService leaderRetrievalService;
+ Object result = Await.result(response, timeout);
- try {
- leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- } catch (Exception e) {
- throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
- }
-
- try {
- jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
- leaderRetrievalService,
- actorSystem,
- lookupTimeout);
- } catch (LeaderRetrievalException e) {
- throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
- }
-
- Future<Object> response;
- try {
- response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
- } catch (Exception e) {
- throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
- }
-
- Object result = Await.result(response, timeout);
-
- if (result instanceof JobManagerMessages.CancellationSuccess) {
- LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
- } else if (result instanceof JobManagerMessages.CancellationFailure) {
- Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
- LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
- throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
- } else {
- throw new Exception("Unknown message received while cancelling.");
- }
- } finally {
- // shut down started actor system
- actorSystem.shutdown();
- actorSystem.awaitTermination();
+ if (result instanceof JobManagerMessages.CancellationSuccess) {
+ LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+ } else if (result instanceof JobManagerMessages.CancellationFailure) {
+ Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+ LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+ throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+ } else {
+ throw new Exception("Unknown message received while cancelling.");
}
}
-
/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used
@@ -473,117 +471,98 @@ public class Client {
*/
public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
- final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
- final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
- ActorSystem actorSystem;
+ Future<Object> response;
try {
- actorSystem = JobClient.startJobClientActorSystem(configuration);
+ response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
} catch (Exception e) {
- throw new Exception("Could start client actor system.", e);
+ throw new Exception("Failed to query the job manager gateway for accumulators.", e);
}
- try {
- ActorGateway jobManagerGateway;
-
- LeaderRetrievalService leaderRetrievalService;
-
- try {
- leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- } catch (Exception e) {
- throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
- }
-
- try {
- jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
- leaderRetrievalService,
- actorSystem,
- lookupTimeout);
- } catch (LeaderRetrievalException e) {
- throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
- }
-
- Future<Object> response;
- try {
- response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
- } catch (Exception e) {
- throw new Exception("Failed to query the job manager gateway for accumulators.", e);
- }
-
- Object result = Await.result(response, timeout);
+ Object result = Await.result(response, timeout);
- if (result instanceof AccumulatorResultsFound) {
- Map<String, SerializedValue<Object>> serializedAccumulators =
- ((AccumulatorResultsFound) result).result();
+ if (result instanceof AccumulatorResultsFound) {
+ Map<String, SerializedValue<Object>> serializedAccumulators =
+ ((AccumulatorResultsFound) result).result();
- return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+ return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
- } else if (result instanceof AccumulatorResultsErroneous) {
- throw ((AccumulatorResultsErroneous) result).cause();
- } else {
- throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
- }
- } finally {
- actorSystem.shutdown();
- actorSystem.awaitTermination();
+ } else if (result instanceof AccumulatorResultsErroneous) {
+ throw ((AccumulatorResultsErroneous) result).cause();
+ } else {
+ throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
}
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Sessions
+ // ------------------------------------------------------------------------
- public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
-
- private final Optimizer compiler;
-
- private FlinkPlan optimizerPlan;
-
-
- private OptimizerPlanEnvironment(Optimizer compiler) {
- this.compiler = compiler;
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- Plan plan = createProgramPlan(jobName);
- this.optimizerPlan = compiler.compile(plan);
-
- // do not go on with anything now!
- throw new ProgramAbortException();
+ /**
+ * Tells the JobManager to finish the session (job) defined by the given ID.
+ *
+ * @param jobId The ID that identifies the session.
+ */
+ public void endSession(JobID jobId) throws Exception {
+ if (jobId == null) {
+ throw new IllegalArgumentException("The JobID must not be null.");
}
+ endSessions(Collections.singletonList(jobId));
+ }
- @Override
- public String getExecutionPlan() throws Exception {
- Plan plan = createProgramPlan(null, false);
- this.optimizerPlan = compiler.compile(plan);
-
- // do not go on with anything now!
- throw new ProgramAbortException();
- }
-
- private void setAsContext() {
- ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
-
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- return OptimizerPlanEnvironment.this;
- }
- };
- initializeContextEnvironment(factory);
+ /**
+ * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
+ *
+ * @param jobIds The IDs that identify the sessions.
+ */
+ public void endSessions(List<JobID> jobIds) throws Exception {
+ if (jobIds == null) {
+ throw new IllegalArgumentException("The JobIDs must not be null");
}
- public void setPlan(FlinkPlan plan){
- this.optimizerPlan = plan;
+ for (JobID jid : jobIds) {
+ if (jid != null) {
+ LOG.info("Telling job manager to end the session {}.", jid);
+ jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
+ }
}
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Internal translation methods
+ // ------------------------------------------------------------------------
/**
- * A special exception used to abort programs when the caller is only interested in the
- * program plan, rather than in the full execution.
+ * Creates the optimized plan for a given program, using this client's compiler.
+ *
+ * @param prog The program to be compiled.
+ * @return The compiled and optimized plan, as returned by the compiler.
+ * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+ * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
*/
- public static final class ProgramAbortException extends Error {
- private static final long serialVersionUID = 1L;
+ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException,
+ ProgramInvocationException {
+ return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}
+
+ public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+ return getJobGraph(optPlan, prog.getAllLibraries());
+ }
+
+ private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+ JobGraph job;
+ if (optPlan instanceof StreamingPlan) {
+ job = ((StreamingPlan) optPlan).getJobGraph();
+ } else {
+ JobGraphGenerator gen = new JobGraphGenerator();
+ job = gen.compileJobGraph((OptimizedPlan) optPlan);
+ }
+
+ for (File jar : jarFiles) {
+ job.addJar(new Path(jar.getAbsolutePath()));
+ }
+
+ return job;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9287017..ad14a06 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,15 +40,14 @@ public class ContextEnvironment extends ExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
private final Client client;
-
+
private final List<File> jarFilesToAttach;
-
+
private final ClassLoader userCodeClassLoader;
private final boolean wait;
-
-
-
+
+
public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
@@ -60,27 +60,33 @@ public class ContextEnvironment extends ExecutionEnvironment {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
- JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
- if(result instanceof JobExecutionResult) {
- this.lastJobExecutionResult = (JobExecutionResult) result;
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
+ if (wait) {
+ this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+ return this.lastJobExecutionResult;
+ }
+ else {
+ JobSubmissionResult result = client.runDetached(toRun, getParallelism());
+ LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+ this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
return this.lastJobExecutionResult;
}
}
@Override
public String getExecutionPlan() throws Exception {
- Plan p = createProgramPlan("unnamed job");
-
- OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());
+ Plan plan = createProgramPlan("unnamed job");
+ OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism());
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}
+ @Override
+ public void startNewSession() throws Exception {
+ client.endSession(jobID);
+ jobID = JobID.generate();
+ }
+
public boolean isWait() {
return wait;
}
@@ -104,7 +110,9 @@ public class ContextEnvironment extends ExecutionEnvironment {
static void setAsContext(Client client, List<File> jarFilesToAttach,
ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
{
- initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait));
+ ContextEnvironmentFactory factory =
+ new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+ initializeContextEnvironment(factory);
}
protected static void enableLocalExecution(boolean enabled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b86487f..9e84e2d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client.program;
import java.io.File;
@@ -30,6 +29,10 @@ import java.util.List;
import org.apache.flink.api.common.Plan;
+/**
+ * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
+ * the classes of the functions and libraries necessary for the execution.
+ */
public class JobWithJars {
private Plan plan;
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
new file mode 100644
index 0000000..c9c3b45
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class OptimizerPlanEnvironment extends ExecutionEnvironment {
+
+ private final Optimizer compiler;
+
+ private FlinkPlan optimizerPlan;
+
+ public OptimizerPlanEnvironment(Optimizer compiler) {
+ this.compiler = compiler;
+ }
+
+ // ------------------------------------------------------------------------
+ // Execution Environment methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ Plan plan = createProgramPlan(jobName);
+ this.optimizerPlan = compiler.compile(plan);
+
+ // do not go on with anything now!
+ throw new ProgramAbortException();
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ Plan plan = createProgramPlan(null, false);
+ this.optimizerPlan = compiler.compile(plan);
+
+ // do not go on with anything now!
+ throw new ProgramAbortException();
+ }
+
+ @Override
+ public void startNewSession() {
+ // do nothing
+ }
+
+ public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
+ setAsContext();
+
+ // temporarily write syserr and sysout to a byte array.
+ PrintStream originalOut = System.out;
+ PrintStream originalErr = System.err;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(baos));
+ ByteArrayOutputStream baes = new ByteArrayOutputStream();
+ System.setErr(new PrintStream(baes));
+ try {
+ ContextEnvironment.enableLocalExecution(false);
+ prog.invokeInteractiveModeForExecution();
+ }
+ catch (ProgramInvocationException e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ // the invocation gets aborted with the preview plan
+ if (optimizerPlan != null) {
+ return optimizerPlan;
+ } else {
+ throw new ProgramInvocationException("The program caused an error: ", t);
+ }
+ }
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ System.err.println(baes);
+ System.out.println(baos);
+ }
+
+ throw new ProgramInvocationException(
+ "The program plan could not be fetched - the program aborted pre-maturely.\n"
+ + "System.err: " + baes.toString() + '\n'
+ + "System.out: " + baos.toString() + '\n');
+ }
+ // ------------------------------------------------------------------------
+
+ private void setAsContext() {
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return OptimizerPlanEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public void setPlan(FlinkPlan plan){
+ this.optimizerPlan = plan;
+ }
+
+ /**
+ * A special exception used to abort programs when the caller is only interested in the
+ * program plan, rather than in the full execution.
+ */
+ public static final class ProgramAbortException extends Error {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 10096da..091a959 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -40,12 +40,9 @@ import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -166,7 +163,7 @@ public class PackagedProgram {
}
}
- public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
+ PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
this.jarFile = null;
this.args = args == null ? new String[0] : args;
@@ -685,51 +682,5 @@ public class PackagedProgram {
throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
}
}
-
- // --------------------------------------------------------------------------------------------
-
- public static final class PreviewPlanEnvironment extends ExecutionEnvironment {
-
- private List<DataSinkNode> previewPlan;
- private Plan plan;
-
- private String preview = null;
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- this.plan = createProgramPlan(jobName);
- this.previewPlan = Optimizer.createPreOptimizedPlan((Plan) plan);
-
- // do not go on with anything now!
- throw new Client.ProgramAbortException();
- }
- @Override
- public String getExecutionPlan() throws Exception {
- Plan plan = createProgramPlan("unused");
- this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
-
- // do not go on with anything now!
- throw new Client.ProgramAbortException();
- }
-
- public void setAsContext() {
- ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- return PreviewPlanEnvironment.this;
- }
- };
- initializeContextEnvironment(factory);
- }
-
- public Plan getPlan() {
- return this.plan;
- }
-
- public void setPreview(String preview) {
- this.preview = preview;
- }
-
- }
}