You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/02 11:56:19 UTC

[4/4] flink git commit: [FLINK-2762][cli] print execution result only if available

[FLINK-2762][cli] print execution result only if available


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/142a8440
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/142a8440
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/142a8440

Branch: refs/heads/master
Commit: 142a8440f3e31be3c5c83f87a36cf468c103a9e2
Parents: fc7369e
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 15:58:53 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 21 +++++++++++---------
 .../org/apache/flink/client/program/Client.java |  4 ++--
 2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index d071cdb..034227e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -39,12 +39,12 @@ import java.util.Properties;
 import akka.actor.ActorSystem;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.InfoOptions;
@@ -638,6 +638,8 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+		LOG.info("Starting execution of program");
+
 		JobSubmissionResult result;
 		try {
 			result = client.runDetached(program, parallelism);
@@ -662,7 +664,7 @@ public class CliFrontend {
 	protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
 		LOG.info("Starting execution of program");
 
-		JobExecutionResult result;
+		JobSubmissionResult result;
 		try {
 			client.setPrintStatusDuringExecution(true);
 			result = client.runBlocking(program, parallelism);
@@ -676,13 +678,14 @@ public class CliFrontend {
 
 		LOG.info("Program execution finished");
 
-		if (!webFrontend) {
-			System.out.println("Job with JobID " + result.getJobID() + " has finished.");
-			System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
-			Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+		if (result instanceof JobExecutionResult && !webFrontend) {
+			JobExecutionResult execResult = (JobExecutionResult) result;
+			System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
+			System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
+			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
 			if (accumulatorsResult.size() > 0) {
-				System.out.println("Accumulator Results: ");
-				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+					System.out.println("Accumulator Results: ");
+					System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index c72681d..91ed665 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -275,7 +275,7 @@ public class Client {
 	//  Program submission / execution
 	// ------------------------------------------------------------------------
 
-	public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+	public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
 			return runBlocking(prog.getPlanWithJars(), parallelism);
@@ -292,7 +292,7 @@ public class Client {
 				ContextEnvironment.unsetContext();
 			}
 
-			return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+			return new JobSubmissionResult(lastJobID);
 		}
 		else {
 			throw new RuntimeException();