You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/23 12:58:42 UTC
[5/6] flink git commit: [FLINK-5562] [gelly] Driver fixes: Improve
parametrization and output formatting.
[FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c943e68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c943e68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c943e68
Branch: refs/heads/release-1.2
Commit: 3c943e6820525180089f3d402010e138fd9af54d
Parents: 37bffdd
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../graph/drivers/ClusteringCoefficient.java | 2 +-
.../apache/flink/graph/drivers/Graph500.java | 37 ++++++++++++++++----
.../flink/graph/drivers/GraphMetrics.java | 11 +++---
.../org/apache/flink/graph/drivers/HITS.java | 11 +++---
.../flink/graph/drivers/JaccardIndex.java | 11 +++---
.../flink/graph/drivers/TriangleListing.java | 7 ++--
.../flink/graph/AbstractGraphAnalytic.java | 5 +--
.../org/apache/flink/graph/AnalyticHelper.java | 8 ++++-
8 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index cd28ee4..79a17a4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -78,7 +78,7 @@ public class ClusteringCoefficient {
.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
" the vertex, and the number of edges between vertex neighbors.", 80))
.appendNewLine()
- .appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+ .appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 51ef66f..a4e7c01 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -53,8 +52,6 @@ public class Graph500 {
private static final int DEFAULT_EDGE_FACTOR = 16;
- private static final boolean DEFAULT_SIMPLIFY = false;
-
private static final boolean DEFAULT_CLIP_AND_FLIP = true;
private static String getUsage(String message) {
@@ -68,6 +65,9 @@ public class Graph500 {
.appendNewLine()
.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
.appendNewLine()
+ .appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv>")
+ .appendNewLine()
+ .appendln("options:")
.appendln(" --output print")
.appendln(" --output hash")
.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
@@ -84,6 +84,17 @@ public class Graph500 {
ParameterTool parameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameters);
+ if (! parameters.has("directed")) {
+ throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+ }
+ boolean directed = parameters.getBoolean("directed");
+
+ if (! parameters.has("simplify")) {
+ throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'"));
+ }
+ boolean simplify = parameters.getBoolean("simplify");
+
+
// Generate RMat graph
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
@@ -93,14 +104,23 @@ public class Graph500 {
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
- boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();
- if (simplify) {
- graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+ if (directed) {
+ if (simplify) {
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+ }
+ } else {
+ if (simplify) {
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+ } else {
+ graph = graph.getUndirected();
+ }
}
DataSet<Tuple2<LongValue,LongValue>> edges = graph
@@ -110,15 +130,17 @@ public class Graph500 {
// Print, hash, or write RMat graph to disk
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
edges.print();
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(edges));
break;
case "csv":
- String filename = parameters.get("filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -137,6 +159,7 @@ public class Graph500 {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 899ae66..9b246df 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -63,7 +63,7 @@ public class GraphMetrics {
.appendNewLine()
.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
.appendNewLine()
- .appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>")
+ .appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -98,7 +98,7 @@ public class GraphMetrics {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -225,14 +225,17 @@ public class GraphMetrics {
env.execute("Graph Metrics");
+ System.out.println();
System.out.print("Vertex metrics:\n ");
System.out.println(vm.getResult().toString().replace(";", "\n "));
- System.out.print("\nEdge metrics:\n ");
+ System.out.println();
+ System.out.print("Edge metrics:\n ");
System.out.println(em.getResult().toString().replace(";", "\n "));
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
- System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ System.out.println();
+ System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index b035bd7..453b543 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -69,7 +69,7 @@ public class HITS {
" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
" and good \"authorities\" are linked from good \"hubs\".", 80))
.appendNewLine()
- .appendln("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+ .appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -104,7 +104,7 @@ public class HITS {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -157,17 +157,19 @@ public class HITS {
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
for (Object e: hits.collect()) {
System.out.println(((Result)e).toVerboseString());
}
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(hits));
break;
case "csv":
- String filename = parameters.get("output_filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -177,7 +179,7 @@ public class HITS {
hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
- env.execute();
+ env.execute("HITS");
break;
default:
throw new ProgramParametrizationException(getUsage("invalid output type"));
@@ -186,6 +188,7 @@ public class HITS {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index cb11af9..abb675a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -75,7 +75,7 @@ public class JaccardIndex {
.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
" number of shared neighbors, and the number of distinct neighbors.", 80))
.appendNewLine()
- .appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+ .appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -110,7 +110,7 @@ public class JaccardIndex {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -137,7 +137,7 @@ public class JaccardIndex {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
.setParallelism(little_parallelism));
}
@@ -189,6 +189,7 @@ public class JaccardIndex {
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
for (Object e: ji.collect()) {
Result result = (Result)e;
System.out.println(result.toVerboseString());
@@ -196,11 +197,12 @@ public class JaccardIndex {
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(ji));
break;
case "csv":
- String filename = parameters.get("output_filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -220,6 +222,7 @@ public class JaccardIndex {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 818b0d8..1fecc3d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -116,7 +116,7 @@ public class TriangleListing {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -284,6 +284,7 @@ public class TriangleListing {
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
if (directedAlgorithm) {
for (Object e: tl.collect()) {
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
@@ -296,11 +297,12 @@ public class TriangleListing {
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(tl));
break;
case "csv":
- String filename = parameters.get("output_filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -324,6 +326,7 @@ public class TriangleListing {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
index b13e82e..4d3d055 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -38,14 +38,12 @@ implements GraphAnalytic<K, VV, EV, T> {
public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
throws Exception {
env = input.getContext();
- return null;
+ return this;
}
@Override
public T execute()
throws Exception {
- Preconditions.checkNotNull(env);
-
env.execute();
return getResult();
}
@@ -54,7 +52,6 @@ implements GraphAnalytic<K, VV, EV, T> {
public T execute(String jobName)
throws Exception {
Preconditions.checkNotNull(jobName);
- Preconditions.checkNotNull(env);
env.execute(jobName);
return getResult();
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
index b07a8c3..dbe3e0c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
@@ -18,11 +18,13 @@
package org.apache.flink.graph;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
@@ -74,6 +76,10 @@ extends RichOutputFormat<T> {
* @return The value of the accumulator with the given name
*/
public <A> A getAccumulator(ExecutionEnvironment env, String accumulatorName) {
- return env.getLastJobExecutionResult().getAccumulatorResult(id + SEPARATOR + accumulatorName);
+ JobExecutionResult result = env.getLastJobExecutionResult();
+
+ Preconditions.checkNotNull(result, "No result found for job, was execute() called before getting the result?");
+
+ return result.getAccumulatorResult(id + SEPARATOR + accumulatorName);
}
}