You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/09 22:20:31 UTC
flink git commit: [FLINK-1974] Fix getNetRuntime() of
JobExecutionResult and add documentation
Repository: flink
Updated Branches:
refs/heads/master 170366c90 -> 7e5a97062
[FLINK-1974] Fix getNetRuntime() of JobExecutionResult and add documentation
- Fix JobInfo to report milliseconds
- Added documentation to indicate that the return type is in milliseconds
- Added an getNetRuntime method which accepts a desired time unit for easy conversion
This closes #652
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e5a9706
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e5a9706
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e5a9706
Branch: refs/heads/master
Commit: 7e5a97062c8d5d322be81a9d8af35f942a929f57
Parents: 170366c
Author: Johannes <jk...@gmail.com>
Authored: Tue May 5 17:32:41 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 9 21:53:28 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/client/CliFrontend.java | 2 +-
.../apache/flink/api/common/JobExecutionResult.java | 16 ++++++++++++++--
.../client/SerializedJobExecutionResult.java | 14 +++++++++++++-
.../apache/flink/runtime/jobmanager/JobInfo.scala | 2 +-
.../client/SerializedJobExecutionResultTest.java | 4 ++++
.../java/record/io/jdbc/example/JDBCExample.java | 2 +-
.../flink/test/recordJobs/wordcount/WordCount.java | 11 ++++++-----
.../flink/test/recovery/SimpleRecoveryITCase.java | 2 +-
.../test/runtime/NetworkStackThroughputITCase.java | 3 ++-
9 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a13b322..6ca8c4d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -609,7 +609,7 @@ public class CliFrontend {
}
if (execResult instanceof JobExecutionResult) {
JobExecutionResult result = (JobExecutionResult) execResult;
- System.out.println("Job Runtime: " + result.getNetRuntime());
+ System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index 62848f7..bf06c75 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.common;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* The result of a job execution. Gives access to the execution time of the job,
@@ -34,7 +35,7 @@ public class JobExecutionResult extends JobSubmissionResult {
* Creates a new JobExecutionResult.
*
* @param jobID The job's ID.
- * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
+ * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulators produced by the job.
*/
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
@@ -47,12 +48,23 @@ public class JobExecutionResult extends JobSubmissionResult {
* Gets the net execution time of the job, i.e., the execution time in the parallel system,
* without the pre-flight steps like the optimizer.
*
- * @return The net execution time.
+ * @return The net execution time in milliseconds.
*/
public long getNetRuntime() {
return this.netRuntime;
}
+ /**
+ * Gets the net execution time of the job, i.e., the execution time in the parallel system,
+ * without the pre-flight steps like the optimizer in a desired time unit.
+ *
+ * @param desiredUnit the unit of the <tt>NetRuntime</tt>
+ * @return The net execution time in the desired unit.
+ */
+ public long getNetRuntime(TimeUnit desiredUnit) {
+ return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS);
+ }
+
/**
* Gets the accumulator with the given name. Returns {@code null}, if no accumulator with
* that name was produced.
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
index e0b1ad1..029bc38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* A variant of the {@link org.apache.flink.api.common.JobExecutionResult} that holds
@@ -45,7 +46,7 @@ public class SerializedJobExecutionResult implements java.io.Serializable {
* Creates a new SerializedJobExecutionResult.
*
* @param jobID The job's ID.
- * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
+ * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulator results produced by the job, in serialized form
*/
public SerializedJobExecutionResult(JobID jobID, long netRuntime,
@@ -63,6 +64,17 @@ public class SerializedJobExecutionResult implements java.io.Serializable {
return netRuntime;
}
+ /**
+ * Gets the net execution time of the job, i.e., the execution time in the parallel system,
+ * without the pre-flight steps like the optimizer in a desired time unit.
+ *
+ * @param desiredUnit the unit of the <tt>NetRuntime</tt>
+ * @return The net execution time in the desired unit.
+ */
+ public long getNetRuntime(TimeUnit desiredUnit) {
+ return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS);
+ }
+
public Map<String, SerializedValue<Object>> getSerializedAccumulatorResults() {
return this.accumulatorResults;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 4b7446c..26d7272 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -34,7 +34,7 @@ class JobInfo(val client: ActorRef, val start: Long){
def duration: Long = {
if(end != -1){
- (end - start)/1000
+ end - start
}else{
-1
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
index f58bbe1..5c9ffa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
@@ -26,6 +26,7 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -53,6 +54,7 @@ public class SerializedJobExecutionResultTest {
assertEquals(origJobId, cloned.getJobId());
assertEquals(origTime, cloned.getNetRuntime());
+ assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origMap, cloned.getSerializedAccumulatorResults());
// convert to deserialized result
@@ -62,7 +64,9 @@ public class SerializedJobExecutionResultTest {
assertEquals(origJobId, jResult.getJobID());
assertEquals(origJobId, jResultCopied.getJobID());
assertEquals(origTime, jResult.getNetRuntime());
+ assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origTime, jResultCopied.getNetRuntime());
+ assertEquals(origTime, jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));
for (Map.Entry<String, SerializedValue<Object>> entry : origMap.entrySet()) {
String name = entry.getKey();
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
index 58aaf9e..213fd6a 100644
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
@@ -86,7 +86,7 @@ public class JDBCExample implements Program, ProgramDescription {
prepareTestDb();
JDBCExample tut = new JDBCExample();
JobExecutionResult res = LocalExecutor.execute(tut, args);
- System.out.println("runtime: " + res.getNetRuntime());
+ System.out.println("runtime: " + res.getNetRuntime() + " ms");
System.exit(0);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
index 53b2663..96eb1fc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/wordcount/WordCount.java
@@ -18,16 +18,13 @@
package org.apache.flink.test.recordJobs.wordcount;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
@@ -41,6 +38,10 @@ import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
+
/**
* Implements a word count which takes the input file and counts the number of
* the occurrences of each word in the file.
@@ -154,6 +155,6 @@ public class WordCount implements Program, ProgramDescription {
// This will execute the word-count embedded in a local context. replace this line by the commented
// succeeding line to send the job to a local installation or to a cluster for execution
JobExecutionResult result = LocalExecutor.execute(plan);
- System.err.println("Total runtime: " + result.getNetRuntime());
+ System.err.println("Total runtime: " + result.getNetRuntime(TimeUnit.MILLISECONDS) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index e61e551..e2f5a71 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -96,7 +96,7 @@ public class SimpleRecoveryITCase {
try {
JobExecutionResult res = env.execute();
- String msg = res == null ? "null result" : "result in " + res.getNetRuntime();
+ String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms";
fail("The program should have failed, but returned " + msg);
}
catch (ProgramInvocationException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7e5a9706/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 36c7cba..7b43266 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
@Ignore
public class NetworkStackThroughputITCase {
@@ -143,7 +144,7 @@ public class NetworkStackThroughputITCase {
int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
long dataVolumeMbit = dataVolumeGb * 8192;
- long runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000;
+ long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);
int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);