You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/23 12:58:42 UTC

[5/6] flink git commit: [FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting.

[FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c943e68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c943e68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c943e68

Branch: refs/heads/release-1.2
Commit: 3c943e6820525180089f3d402010e138fd9af54d
Parents: 37bffdd
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../graph/drivers/ClusteringCoefficient.java    |  2 +-
 .../apache/flink/graph/drivers/Graph500.java    | 37 ++++++++++++++++----
 .../flink/graph/drivers/GraphMetrics.java       | 11 +++---
 .../org/apache/flink/graph/drivers/HITS.java    | 11 +++---
 .../flink/graph/drivers/JaccardIndex.java       | 11 +++---
 .../flink/graph/drivers/TriangleListing.java    |  7 ++--
 .../flink/graph/AbstractGraphAnalytic.java      |  5 +--
 .../org/apache/flink/graph/AnalyticHelper.java  |  8 ++++-
 8 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index cd28ee4..79a17a4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -78,7 +78,7 @@ public class ClusteringCoefficient {
 			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
 				" the vertex, and the number of edges between vertex neighbors.", 80))
 			.appendNewLine()
-			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 51ef66f..a4e7c01 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -53,8 +52,6 @@ public class Graph500 {
 
 	private static final int DEFAULT_EDGE_FACTOR = 16;
 
-	private static final boolean DEFAULT_SIMPLIFY = false;
-
 	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
 
 	private static String getUsage(String message) {
@@ -68,6 +65,9 @@ public class Graph500 {
 			.appendNewLine()
 			.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
 			.appendNewLine()
+			.appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv>")
+			.appendNewLine()
+			.appendln("options:")
 			.appendln("  --output print")
 			.appendln("  --output hash")
 			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
@@ -84,6 +84,17 @@ public class Graph500 {
 		ParameterTool parameters = ParameterTool.fromArgs(args);
 		env.getConfig().setGlobalJobParameters(parameters);
 
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directed = parameters.getBoolean("directed");
+
+		if (! parameters.has("simplify")) {
+			throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'"));
+		}
+		boolean simplify = parameters.getBoolean("simplify");
+
+
 		// Generate RMat graph
 		int scale = parameters.getInt("scale", DEFAULT_SCALE);
 		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
@@ -93,14 +104,23 @@ public class Graph500 {
 		long vertexCount = 1L << scale;
 		long edgeCount = vertexCount * edgeFactor;
 
-		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
 			.generate();
 
-		if (simplify) {
-			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+		if (directed) {
+			if (simplify) {
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+			}
+		} else {
+			if (simplify) {
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+			} else {
+				graph = graph.getUndirected();
+			}
 		}
 
 		DataSet<Tuple2<LongValue,LongValue>> edges = graph
@@ -110,15 +130,17 @@ public class Graph500 {
 		// Print, hash, or write RMat graph to disk
 		switch (parameters.get("output", "")) {
 		case "print":
+			System.out.println();
 			edges.print();
 			break;
 
 		case "hash":
+			System.out.println();
 			System.out.println(DataSetUtils.checksumHashCode(edges));
 			break;
 
 		case "csv":
-			String filename = parameters.get("filename");
+			String filename = parameters.getRequired("output_filename");
 
 			String lineDelimiter = StringEscapeUtils.unescapeJava(
 				parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -137,6 +159,7 @@ public class Graph500 {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 899ae66..9b246df 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -63,7 +63,7 @@ public class GraphMetrics {
 			.appendNewLine()
 			.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
 			.appendNewLine()
-			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>")
+			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -98,7 +98,7 @@ public class GraphMetrics {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -225,14 +225,17 @@ public class GraphMetrics {
 
 		env.execute("Graph Metrics");
 
+		System.out.println();
 		System.out.print("Vertex metrics:\n  ");
 		System.out.println(vm.getResult().toString().replace(";", "\n "));
-		System.out.print("\nEdge metrics:\n  ");
+		System.out.println();
+		System.out.print("Edge metrics:\n  ");
 		System.out.println(em.getResult().toString().replace(";", "\n "));
 
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+		System.out.println();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index b035bd7..453b543 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -69,7 +69,7 @@ public class HITS {
 				" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
 				" and good \"authorities\" are linked from good \"hubs\".", 80))
 			.appendNewLine()
-			.appendln("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -104,7 +104,7 @@ public class HITS {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -157,17 +157,19 @@ public class HITS {
 
 		switch (parameters.get("output", "")) {
 			case "print":
+				System.out.println();
 				for (Object e: hits.collect()) {
 					System.out.println(((Result)e).toVerboseString());
 				}
 				break;
 
 			case "hash":
+				System.out.println();
 				System.out.println(DataSetUtils.checksumHashCode(hits));
 				break;
 
 			case "csv":
-				String filename = parameters.get("output_filename");
+				String filename = parameters.getRequired("output_filename");
 
 				String lineDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -177,7 +179,7 @@ public class HITS {
 
 				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
 
-				env.execute();
+				env.execute("HITS");
 				break;
 			default:
 				throw new ProgramParametrizationException(getUsage("invalid output type"));
@@ -186,6 +188,7 @@ public class HITS {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index cb11af9..abb675a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -75,7 +75,7 @@ public class JaccardIndex {
 			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
 				" number of shared neighbors, and the number of distinct neighbors.", 80))
 			.appendNewLine()
-			.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -110,7 +110,7 @@ public class JaccardIndex {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -137,7 +137,7 @@ public class JaccardIndex {
 
 						if (parameters.getBoolean("simplify", false)) {
 							graph = graph
-								.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
 									.setParallelism(little_parallelism));
 						}
 
@@ -189,6 +189,7 @@ public class JaccardIndex {
 
 		switch (parameters.get("output", "")) {
 			case "print":
+				System.out.println();
 				for (Object e: ji.collect()) {
 					Result result = (Result)e;
 					System.out.println(result.toVerboseString());
@@ -196,11 +197,12 @@ public class JaccardIndex {
 				break;
 
 			case "hash":
+				System.out.println();
 				System.out.println(DataSetUtils.checksumHashCode(ji));
 				break;
 
 			case "csv":
-				String filename = parameters.get("output_filename");
+				String filename = parameters.getRequired("output_filename");
 
 				String lineDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -220,6 +222,7 @@ public class JaccardIndex {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 818b0d8..1fecc3d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -116,7 +116,7 @@ public class TriangleListing {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -284,6 +284,7 @@ public class TriangleListing {
 
 		switch (parameters.get("output", "")) {
 			case "print":
+				System.out.println();
 				if (directedAlgorithm) {
 					for (Object e: tl.collect()) {
 						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
@@ -296,11 +297,12 @@ public class TriangleListing {
 				break;
 
 			case "hash":
+				System.out.println();
 				System.out.println(DataSetUtils.checksumHashCode(tl));
 				break;
 
 			case "csv":
-				String filename = parameters.get("output_filename");
+				String filename = parameters.getRequired("output_filename");
 
 				String lineDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -324,6 +326,7 @@ public class TriangleListing {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
index b13e82e..4d3d055 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -38,14 +38,12 @@ implements GraphAnalytic<K, VV, EV, T> {
 	public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
 			throws Exception {
 		env = input.getContext();
-		return null;
+		return this;
 	}
 
 	@Override
 	public T execute()
 			throws Exception {
-		Preconditions.checkNotNull(env);
-
 		env.execute();
 		return getResult();
 	}
@@ -54,7 +52,6 @@ implements GraphAnalytic<K, VV, EV, T> {
 	public T execute(String jobName)
 			throws Exception {
 		Preconditions.checkNotNull(jobName);
-		Preconditions.checkNotNull(env);
 
 		env.execute(jobName);
 		return getResult();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
index b07a8c3..dbe3e0c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.graph;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -74,6 +76,10 @@ extends RichOutputFormat<T> {
 	 * @return The value of the accumulator with the given name
 	 */
 	public <A> A getAccumulator(ExecutionEnvironment env, String accumulatorName) {
-		return env.getLastJobExecutionResult().getAccumulatorResult(id + SEPARATOR + accumulatorName);
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		Preconditions.checkNotNull(result, "No result found for job, was execute() called before getting the result?");
+
+		return result.getAccumulatorResult(id + SEPARATOR + accumulatorName);
 	}
 }