You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/11 13:57:23 UTC
[5/7] flink git commit: [FLINK-6358] [gelly] Write job details for
Gelly examples
[FLINK-6358] [gelly] Write job details for Gelly examples
Add an option to write job details to a file in JSON format. Job details
include: job ID, runtime, parameters with values, and accumulators with
values.
This closes #4170
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273223f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273223f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273223f0
Branch: refs/heads/master
Commit: 273223f0143f4af2ac8416af5322e384ec02ab1f
Parents: be4853d
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jun 21 10:25:57 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:22 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/flink/graph/Runner.java | 97 +++++++++++++++++++-
1 file changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/273223f0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index af2a11c..d0e6a92 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
@@ -48,14 +49,24 @@ import org.apache.flink.graph.drivers.input.StarGraph;
import org.apache.flink.graph.drivers.output.Hash;
import org.apache.flink.graph.drivers.output.Output;
import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.InstantiationUtil;
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.commons.lang3.text.StrBuilder;
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
* This default main class executes Flink drivers.
@@ -70,7 +81,8 @@ import java.util.List;
* <p>Algorithms must explicitly support each type of output via implementation of
* interfaces. This is scalable as the number of outputs is small and finite.
*/
-public class Runner {
+public class Runner
+extends ParameterizedBase {
private static final String INPUT = "input";
@@ -108,6 +120,27 @@ public class Runner {
.addClass(Hash.class)
.addClass(Print.class);
+ private final ParameterTool parameters;
+
+ private final BooleanParameter disableObjectReuse = new BooleanParameter(this, "__disable_object_reuse");
+
+ private final StringParameter jobDetailsPath = new StringParameter(this, "__job_details_path")
+ .setDefaultValue(null);
+
+ /**
+ * Create an algorithm runner from the given arguments.
+ *
+ * @param args command-line arguments
+ */
+ public Runner(String[] args) {
+ parameters = ParameterTool.fromArgs(args);
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
/**
* List available algorithms. This is displayed to the user when no valid
* algorithm is given in the program parameterization.
@@ -192,21 +225,26 @@ public class Runner {
.toString();
}
- public static void main(String[] args) throws Exception {
+ public void run() throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig config = env.getConfig();
// should not have any non-Flink data types
- config.disableAutoTypeRegistration();
config.disableForceAvro();
config.disableForceKryo();
- ParameterTool parameters = ParameterTool.fromArgs(args);
config.setGlobalJobParameters(parameters);
+ // configure local parameters and throw proper exception on error
+ try {
+ this.configure(parameters);
+ } catch (RuntimeException ex) {
+ throw new ProgramParametrizationException(ex.getMessage());
+ }
+
// integration tests run with with object reuse both disabled and enabled
- if (parameters.has("__disable_object_reuse")) {
+ if (disableObjectReuse.getValue()) {
config.disableObjectReuse();
} else {
config.enableObjectReuse();
@@ -296,6 +334,55 @@ public class Runner {
}
algorithm.printAnalytics(System.out);
+
+ if (jobDetailsPath.getValue() != null) {
+ writeJobDetails(env, jobDetailsPath.getValue());
+ }
+ }
+
+ /**
+ * Write the following job details as a JSON encoded file: runtime environment
+ * job ID, runtime, parameters, and accumulators.
+ *
+ * @param env the execution environment
+ * @param jobDetailsPath filesystem path to write job details
+ * @throws IOException on error writing to jobDetailsPath
+ */
+ private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsPath) throws IOException {
+ JobExecutionResult result = env.getLastJobExecutionResult();
+
+ File jsonFile = new File(jobDetailsPath);
+
+ try (JsonGenerator json = new JsonFactory().createGenerator(jsonFile, JsonEncoding.UTF8)) {
+ json.writeStartObject();
+
+ json.writeObjectFieldStart("Apache Flink");
+ json.writeStringField("version", EnvironmentInformation.getVersion());
+ json.writeStringField("commit ID", EnvironmentInformation.getRevisionInformation().commitId);
+ json.writeStringField("commit date", EnvironmentInformation.getRevisionInformation().commitDate);
+ json.writeEndObject();
+
+ json.writeStringField("job_id", result.getJobID().toString());
+ json.writeNumberField("runtime_ms", result.getNetRuntime());
+
+ json.writeObjectFieldStart("parameters");
+ for (Map.Entry<String, String> entry : env.getConfig().getGlobalJobParameters().toMap().entrySet()) {
+ json.writeStringField(entry.getKey(), entry.getValue());
+ }
+ json.writeEndObject();
+
+ json.writeObjectFieldStart("accumulators");
+ for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+ json.writeStringField(entry.getKey(), entry.getValue().toString());
+ }
+ json.writeEndObject();
+
+ json.writeEndObject();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new Runner(args).run();
}
/**