You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/09 22:20:31 UTC

flink git commit: [FLINK-1974] Fix getNetRuntime() of JobExecutionResult and add documentation

Repository: flink
Updated Branches:
  refs/heads/master 170366c90 -> 7e5a97062


[FLINK-1974] Fix getNetRuntime() of JobExecutionResult and add documentation

- Fix JobInfo to report milliseconds
- Added documentation to indicate that the return type is in milliseconds
- Added an getNetRuntime method which accepts a desired time unit for easy conversion

This closes #652


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e5a9706
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e5a9706
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e5a9706

Branch: refs/heads/master
Commit: 7e5a97062c8d5d322be81a9d8af35f942a929f57
Parents: 170366c
Author: Johannes <jk...@gmail.com>
Authored: Tue May 5 17:32:41 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 9 21:53:28 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/client/CliFrontend.java   |  2 +-
 .../apache/flink/api/common/JobExecutionResult.java | 16 ++++++++++++++--
 .../client/SerializedJobExecutionResult.java        | 14 +++++++++++++-
 .../apache/flink/runtime/jobmanager/JobInfo.scala   |  2 +-
 .../client/SerializedJobExecutionResultTest.java    |  4 ++++
 .../java/record/io/jdbc/example/JDBCExample.java    |  2 +-
 .../flink/test/recordJobs/wordcount/WordCount.java  | 11 ++++++-----
 .../flink/test/recovery/SimpleRecoveryITCase.java   |  2 +-
 .../test/runtime/NetworkStackThroughputITCase.java  |  3 ++-
 9 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a13b322..6ca8c4d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -609,7 +609,7 @@ public class CliFrontend {
 			}
 			if (execResult instanceof JobExecutionResult) {
 				JobExecutionResult result = (JobExecutionResult) execResult;
-				System.out.println("Job Runtime: " + result.getNetRuntime());
+				System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
 				Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
 				if (accumulatorsResult.size() > 0) {
 					System.out.println("Accumulator Results: ");

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index 62848f7..bf06c75 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The result of a job execution. Gives access to the execution time of the job,
@@ -34,7 +35,7 @@ public class JobExecutionResult extends JobSubmissionResult {
 	 * Creates a new JobExecutionResult.
 	 *
 	 * @param jobID The job's ID.
-	 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
+	 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
 	 * @param accumulators A map of all accumulators produced by the job.
 	 */
 	public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
@@ -47,12 +48,23 @@ public class JobExecutionResult extends JobSubmissionResult {
 	 * Gets the net execution time of the job, i.e., the execution time in the parallel system,
 	 * without the pre-flight steps like the optimizer.
 	 *
-	 * @return The net execution time.
+	 * @return The net execution time in milliseconds.
 	 */
 	public long getNetRuntime() {
 		return this.netRuntime;
 	}
 
+    /**
+	 * Gets the net execution time of the job, i.e., the execution time in the parallel system,
+	 * without the pre-flight steps like the optimizer in a desired time unit.
+	 *
+	 * @param desiredUnit the unit of the <tt>NetRuntime</tt>
+	 * @return The net execution time in the desired unit.
+	 */
+	public long getNetRuntime(TimeUnit desiredUnit) {
+		return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS);
+	}
+
 	/**
 	 * Gets the accumulator with the given name. Returns {@code null}, if no accumulator with
 	 * that name was produced.

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
index e0b1ad1..029bc38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A variant of the {@link org.apache.flink.api.common.JobExecutionResult} that holds
@@ -45,7 +46,7 @@ public class SerializedJobExecutionResult implements java.io.Serializable {
 	 * Creates a new SerializedJobExecutionResult.
 	 *
 	 * @param jobID The job's ID.
-	 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
+	 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
 	 * @param accumulators A map of all accumulator results produced by the job, in serialized form
 	 */
 	public SerializedJobExecutionResult(JobID jobID, long netRuntime,
@@ -63,6 +64,17 @@ public class SerializedJobExecutionResult implements java.io.Serializable {
 		return netRuntime;
 	}
 
+    /**
+	 * Gets the net execution time of the job, i.e., the execution time in the parallel system,
+	 * without the pre-flight steps like the optimizer in a desired time unit.
+	 *
+	 * @param desiredUnit the unit of the <tt>NetRuntime</tt>
+	 * @return The net execution time in the desired unit.
+	 */
+	public long getNetRuntime(TimeUnit desiredUnit) {
+		return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS);
+	}
+
 	public Map<String, SerializedValue<Object>> getSerializedAccumulatorResults() {
 		return this.accumulatorResults;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 4b7446c..26d7272 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -34,7 +34,7 @@ class JobInfo(val client: ActorRef, val start: Long){
 
   def duration: Long = {
     if(end != -1){
-      (end - start)/1000
+      end - start
     }else{
       -1
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
index f58bbe1..5c9ffa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -53,6 +54,7 @@ public class SerializedJobExecutionResultTest {
 
 			assertEquals(origJobId, cloned.getJobId());
 			assertEquals(origTime, cloned.getNetRuntime());
+			assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
 			assertEquals(origMap, cloned.getSerializedAccumulatorResults());
 
 			// convert to deserialized result
@@ -62,7 +64,9 @@ public class SerializedJobExecutionResultTest {
 			assertEquals(origJobId, jResult.getJobID());
 			assertEquals(origJobId, jResultCopied.getJobID());
 			assertEquals(origTime, jResult.getNetRuntime());
+			assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
 			assertEquals(origTime, jResultCopied.getNetRuntime());
+			assertEquals(origTime, jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));
 
 			for (Map.Entry<String, SerializedValue<Object>> entry : origMap.entrySet()) {
 				String name = entry.getKey();

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
index 58aaf9e..213fd6a 100644
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
@@ -86,7 +86,7 @@ public class JDBCExample implements Program, ProgramDescription {
 		prepareTestDb();
 		JDBCExample tut = new JDBCExample();
 		JobExecutionResult res = LocalExecutor.execute(tut, args);
-		System.out.println("runtime: " + res.getNetRuntime());
+		System.out.println("runtime: " + res.getNetRuntime() + " ms");
 
 		System.exit(0);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
index 53b2663..96eb1fc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.test.recordJobs.wordcount;
 
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.record.functions.MapFunction;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.record.io.CsvOutputFormat;
 import org.apache.flink.api.java.record.io.TextInputFormat;
 import org.apache.flink.api.java.record.operators.FileDataSink;
@@ -41,6 +38,10 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
 
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Implements a word count which takes the input file and counts the number of
  * the occurrences of each word in the file.
@@ -154,6 +155,6 @@ public class WordCount implements Program, ProgramDescription {
 		// This will execute the word-count embedded in a local context. replace this line by the commented
 		// succeeding line to send the job to a local installation or to a cluster for execution
 		JobExecutionResult result = LocalExecutor.execute(plan);
-		System.err.println("Total runtime: " + result.getNetRuntime());
+		System.err.println("Total runtime: " + result.getNetRuntime(TimeUnit.MILLISECONDS) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index e61e551..e2f5a71 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -96,7 +96,7 @@ public class SimpleRecoveryITCase {
 
 				try {
 					JobExecutionResult res = env.execute();
-					String msg = res == null ? "null result" : "result in " + res.getNetRuntime();
+					String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms";
 					fail("The program should have failed, but returned " + msg);
 				}
 				catch (ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 36c7cba..7b43266 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 @Ignore
 public class NetworkStackThroughputITCase {
@@ -143,7 +144,7 @@ public class NetworkStackThroughputITCase {
 				int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
 
 				long dataVolumeMbit = dataVolumeGb * 8192;
-				long runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000;
+				long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);
 
 				int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);