You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/11 13:57:23 UTC

[5/7] flink git commit: [FLINK-6358] [gelly] Write job details for Gelly examples

[FLINK-6358] [gelly] Write job details for Gelly examples

Add an option to write job details to a file in JSON format. Job details
include: job ID, runtime, parameters with values, and accumulators with
values.

This closes #4170


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273223f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273223f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273223f0

Branch: refs/heads/master
Commit: 273223f0143f4af2ac8416af5322e384ec02ab1f
Parents: be4853d
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jun 21 10:25:57 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:22 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/graph/Runner.java     | 97 +++++++++++++++++++-
 1 file changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/273223f0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index af2a11c..d0e6a92 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -48,14 +49,24 @@ import org.apache.flink.graph.drivers.input.StarGraph;
 import org.apache.flink.graph.drivers.output.Hash;
 import org.apache.flink.graph.drivers.output.Output;
 import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.InstantiationUtil;
 
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.commons.lang3.text.StrBuilder;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This default main class executes Flink drivers.
@@ -70,7 +81,8 @@ import java.util.List;
  * <p>Algorithms must explicitly support each type of output via implementation of
  * interfaces. This is scalable as the number of outputs is small and finite.
  */
-public class Runner {
+public class Runner
+extends ParameterizedBase {
 
 	private static final String INPUT = "input";
 
@@ -108,6 +120,27 @@ public class Runner {
 		.addClass(Hash.class)
 		.addClass(Print.class);
 
+	private final ParameterTool parameters;
+
+	private final BooleanParameter disableObjectReuse = new BooleanParameter(this, "__disable_object_reuse");
+
+	private final StringParameter jobDetailsPath = new StringParameter(this, "__job_details_path")
+		.setDefaultValue(null);
+
+	/**
+	 * Create an algorithm runner from the given arguments.
+	 *
+	 * @param args command-line arguments
+	 */
+	public Runner(String[] args) {
+		parameters = ParameterTool.fromArgs(args);
+	}
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
 	/**
 	 * List available algorithms. This is displayed to the user when no valid
 	 * algorithm is given in the program parameterization.
@@ -192,21 +225,26 @@ public class Runner {
 			.toString();
 	}
 
-	public static void main(String[] args) throws Exception {
+	public void run() throws Exception {
 		// Set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		ExecutionConfig config = env.getConfig();
 
 		// should not have any non-Flink data types
-		config.disableAutoTypeRegistration();
 		config.disableForceAvro();
 		config.disableForceKryo();
 
-		ParameterTool parameters = ParameterTool.fromArgs(args);
 		config.setGlobalJobParameters(parameters);
 
+		// configure local parameters and throw proper exception on error
+		try {
+			this.configure(parameters);
+		} catch (RuntimeException ex) {
+			throw new ProgramParametrizationException(ex.getMessage());
+		}
+
 		// integration tests run with with object reuse both disabled and enabled
-		if (parameters.has("__disable_object_reuse")) {
+		if (disableObjectReuse.getValue()) {
 			config.disableObjectReuse();
 		} else {
 			config.enableObjectReuse();
@@ -296,6 +334,55 @@ public class Runner {
 		}
 
 		algorithm.printAnalytics(System.out);
+
+		if (jobDetailsPath.getValue() != null) {
+			writeJobDetails(env, jobDetailsPath.getValue());
+		}
+	}
+
+	/**
+	 * Write the following job details as a JSON encoded file: runtime environment
+	 * job ID, runtime, parameters, and accumulators.
+	 *
+	 * @param env the execution environment
+	 * @param jobDetailsPath filesystem path to write job details
+	 * @throws IOException on error writing to jobDetailsPath
+	 */
+	private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsPath) throws IOException {
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		File jsonFile = new File(jobDetailsPath);
+
+		try (JsonGenerator json = new JsonFactory().createGenerator(jsonFile, JsonEncoding.UTF8)) {
+			json.writeStartObject();
+
+			json.writeObjectFieldStart("Apache Flink");
+			json.writeStringField("version", EnvironmentInformation.getVersion());
+			json.writeStringField("commit ID", EnvironmentInformation.getRevisionInformation().commitId);
+			json.writeStringField("commit date", EnvironmentInformation.getRevisionInformation().commitDate);
+			json.writeEndObject();
+
+			json.writeStringField("job_id", result.getJobID().toString());
+			json.writeNumberField("runtime_ms", result.getNetRuntime());
+
+			json.writeObjectFieldStart("parameters");
+			for (Map.Entry<String, String> entry : env.getConfig().getGlobalJobParameters().toMap().entrySet()) {
+				json.writeStringField(entry.getKey(), entry.getValue());
+			}
+			json.writeEndObject();
+
+			json.writeObjectFieldStart("accumulators");
+			for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+				json.writeStringField(entry.getKey(), entry.getValue().toString());
+			}
+			json.writeEndObject();
+
+			json.writeEndObject();
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		new Runner(args).run();
 	}
 
 	/**