You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/02 11:56:19 UTC
[4/4] flink git commit: [FLINK-2762][cli] print execution result only
if available
[FLINK-2762][cli] print execution result only if available
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/142a8440
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/142a8440
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/142a8440
Branch: refs/heads/master
Commit: 142a8440f3e31be3c5c83f87a36cf468c103a9e2
Parents: fc7369e
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 15:58:53 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 21 +++++++++++---------
.../org/apache/flink/client/program/Client.java | 4 ++--
2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index d071cdb..034227e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -39,12 +39,12 @@ import java.util.Properties;
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.InfoOptions;
@@ -638,6 +638,8 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+ LOG.info("Starting execution of program");
+
JobSubmissionResult result;
try {
result = client.runDetached(program, parallelism);
@@ -662,7 +664,7 @@ public class CliFrontend {
protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
LOG.info("Starting execution of program");
- JobExecutionResult result;
+ JobSubmissionResult result;
try {
client.setPrintStatusDuringExecution(true);
result = client.runBlocking(program, parallelism);
@@ -676,13 +678,14 @@ public class CliFrontend {
LOG.info("Program execution finished");
- if (!webFrontend) {
- System.out.println("Job with JobID " + result.getJobID() + " has finished.");
- System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
- Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+ if (result instanceof JobExecutionResult && !webFrontend) {
+ JobExecutionResult execResult = (JobExecutionResult) result;
+ System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
+ System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
+ Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index c72681d..91ed665 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -275,7 +275,7 @@ public class Client {
// Program submission / execution
// ------------------------------------------------------------------------
- public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+ public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return runBlocking(prog.getPlanWithJars(), parallelism);
@@ -292,7 +292,7 @@ public class Client {
ContextEnvironment.unsetContext();
}
- return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+ return new JobSubmissionResult(lastJobID);
}
else {
throw new RuntimeException();