You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:29 UTC
[6/6] flink git commit: [FLINK-5913] [gelly] Example drivers
[FLINK-5913] [gelly] Example drivers
Replace existing and create new algorithm Driver implementations for
each of the library methods.
This closes #3635
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a48357db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a48357db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a48357db
Branch: refs/heads/master
Commit: a48357db8c4187fd08f3b17880899ebbcb5d3b5e
Parents: ded25be
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Fri Mar 31 11:17:26 2017 -0400
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Usage.java | 2 -
.../apache/flink/graph/drivers/AdamicAdar.java | 71 ++++
.../graph/drivers/ClusteringCoefficient.java | 378 +++++--------------
.../graph/drivers/ConnectedComponents.java | 105 ++++++
.../apache/flink/graph/drivers/EdgeList.java | 92 +++++
.../apache/flink/graph/drivers/Graph500.java | 165 --------
.../flink/graph/drivers/GraphMetrics.java | 265 ++++---------
.../org/apache/flink/graph/drivers/HITS.java | 188 ++-------
.../flink/graph/drivers/JaccardIndex.java | 224 ++---------
.../apache/flink/graph/drivers/PageRank.java | 74 ++++
.../flink/graph/drivers/SimpleDriver.java | 65 ++++
.../flink/graph/drivers/TriangleListing.java | 362 +++++-------------
.../drivers/parameter/IterationConvergence.java | 89 +++++
.../graph/examples/ConnectedComponents.java | 141 -------
.../examples/GSASingleSourceShortestPaths.java | 4 +-
.../parameter/IterationConvergenceTest.java | 66 ++++
.../examples/ConnectedComponentsITCase.java | 72 ----
.../main/java/org/apache/flink/graph/Graph.java | 16 +-
.../graph/library/ConnectedComponents.java | 5 +-
.../graph/library/GSAConnectedComponents.java | 8 +-
.../flink/graph/library/LabelPropagation.java | 5 +-
.../clustering/directed/TriangleListing.java | 2 +-
.../undirected/LocalClusteringCoefficient.java | 2 +-
.../graph/library/link_analysis/PageRank.java | 8 +-
.../graph/library/similarity/AdamicAdar.java | 2 +-
.../graph/library/similarity/JaccardIndex.java | 2 +-
.../apache/flink/graph/utils/GraphUtils.java | 10 +-
.../flink/graph/utils/NullValueEdgeMapper.java | 32 --
28 files changed, 919 insertions(+), 1536 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
index d923bf0..642fe5b 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -28,7 +28,6 @@ public class Usage {
private static final Class[] DRIVERS = new Class[]{
org.apache.flink.graph.drivers.ClusteringCoefficient.class,
- org.apache.flink.graph.drivers.Graph500.class,
org.apache.flink.graph.drivers.GraphMetrics.class,
org.apache.flink.graph.drivers.HITS.class,
org.apache.flink.graph.drivers.JaccardIndex.class,
@@ -36,7 +35,6 @@ public class Usage {
};
private static final Class[] EXAMPLES = new Class[]{
- org.apache.flink.graph.examples.ConnectedComponents.class,
org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
org.apache.flink.graph.examples.IncrementalSSSP.class,
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
new file mode 100644
index 0000000..742c1de
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
+ */
+public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
+
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "similarity score weighted by centerpoint degree";
+ }
+
+ @Override
+ public String getLongDescription() {
+ return WordUtils.wrap(new StrBuilder()
+ .appendln("Adamic-Adar measures the similarity between vertex neighborhoods and is " +
+ "computed as the sum of the inverse logarithm of centerpoint degree over shared " +
+ "neighbors.")
+ .appendNewLine()
+ .append("The algorithm result contains two vertex IDs and the similarity score.")
+ .toString(), 80);
+ }
+
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ int lp = littleParallelism.getValue().intValue();
+
+ result = graph
+ .run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
+ .setLittleParallelism(lp));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 004390d..c463c0a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -18,333 +18,127 @@
package org.apache.flink.graph.drivers;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.CopyableValue;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Driver for the library implementations of Global and Local Clustering Coefficient.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then calculates
- * the local clustering coefficient for each vertex and the global clustering
- * coefficient for the graph.
+ * Driver for directed and undirected clustering coefficient algorithm and analytics.
*
+ * @see org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
* @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
*/
-public class ClusteringCoefficient {
-
- private static final int DEFAULT_SCALE = 10;
-
- private static final int DEFAULT_EDGE_FACTOR = 16;
-
- private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
- private static String getUsage(String message) {
- return new StrBuilder()
- .appendNewLine()
- .appendln(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
- " vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
- " Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
- " is a clique).", 80))
- .appendNewLine()
- .appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
- " the vertex, and the number of edges between vertex neighbors.", 80))
- .appendNewLine()
- .appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>")
- .appendNewLine()
- .appendln("options:")
- .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
- .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
- .appendNewLine()
- .appendln(" --output print")
- .appendln(" --output hash")
- .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
- .appendNewLine()
- .appendln("Usage error: " + message)
- .toString();
- }
-
- public static void main(String[] args) throws Exception {
- // Set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- ParameterTool parameters = ParameterTool.fromArgs(args);
- env.getConfig().setGlobalJobParameters(parameters);
-
- if (! parameters.has("directed")) {
- throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
- }
- boolean directedAlgorithm = parameters.getBoolean("directed");
-
- int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
- // global and local clustering coefficient results
- GraphAnalytic gcc;
- GraphAnalytic acc;
- DataSet lcc;
-
- switch (parameters.get("input", "")) {
- case "csv": {
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
- .ignoreCommentsEdges("#")
- .lineDelimiterEdges(lineDelimiter)
- .fieldDelimiterEdges(fieldDelimiter);
-
- switch (parameters.get("type", "")) {
- case "integer": {
- Graph<LongValue, NullValue, NullValue> graph = reader
- .keyType(LongValue.class);
+public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends SimpleDriver<PrintableResult>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
- if (directedAlgorithm) {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
- }
+ private static final String DIRECTED = "directed";
- gcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
- .setParallelism(little_parallelism));
- }
+ private static final String UNDIRECTED = "undirected";
- gcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- } break;
+ private ChoiceParameter order = new ChoiceParameter(this, "order")
+ .addChoices(DIRECTED, UNDIRECTED);
- case "string": {
- Graph<StringValue, NullValue, NullValue> graph = reader
- .keyType(StringValue.class);
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
- if (directedAlgorithm) {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
- }
+ private GraphAnalytic<K, VV, EV, ? extends PrintableResult> globalClusteringCoefficient;
- gcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
- .setParallelism(little_parallelism));
- }
+ private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient;
- gcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid CSV type"));
- }
- } break;
-
- case "rmat": {
- int scale = parameters.getInt("scale", DEFAULT_SCALE);
- int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
- RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
- long vertexCount = 1L << scale;
- long edgeCount = vertexCount * edgeFactor;
-
- Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setParallelism(little_parallelism)
- .generate();
-
- if (directedAlgorithm) {
- if (scale > 32) {
- Graph<LongValue, NullValue, NullValue> newGraph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
-
- gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false)
- .setLittleParallelism(little_parallelism));
- } else {
- Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
- .setParallelism(little_parallelism))
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
-
- gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false)
- .setLittleParallelism(little_parallelism));
- }
- } else {
- boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
- if (scale > 32) {
- Graph<LongValue, NullValue, NullValue> newGraph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
- .setParallelism(little_parallelism));
+ @Override
+ public String getShortDescription() {
+ return "measure the connectedness of vertex neighborhoods";
+ }
- gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false)
- .setLittleParallelism(little_parallelism));
- } else {
- Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
- .setParallelism(little_parallelism))
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
- .setParallelism(little_parallelism));
+ @Override
+ public String getLongDescription() {
+ return WordUtils.wrap(new StrBuilder()
+ .appendln("The local clustering coefficient measures the connectedness of each " +
+ "vertex's neighborhood. The global clustering coefficient measures the " +
+ "connected of the graph. The average clustering coefficient is the mean local " +
+ "clustering coefficient. Each score ranges from 0.0 (no edges between vertex " +
+ "neighbors) to 1.0 (neighborhood or graph is a clique).")
+ .appendNewLine()
+ .append("The algorithm result contains the vertex ID, degree, and number of edges " +
+ "connecting neighbors.")
+ .toString(), 80);
+ }
- gcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- acc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- lcc = newGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false)
- .setLittleParallelism(little_parallelism));
- }
- }
- } break;
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ int lp = littleParallelism.getValue().intValue();
- default:
- throw new ProgramParametrizationException(getUsage("invalid input type"));
- }
+ switch (order.getValue()) {
+ case DIRECTED:
+ result = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(lp));
- switch (parameters.get("output", "")) {
- case "print":
- if (directedAlgorithm) {
- for (Object e: lcc.collect()) {
- org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
- (org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
- System.out.println(result.toPrintableString());
- }
- } else {
- for (Object e: lcc.collect()) {
- org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
- (org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
- System.out.println(result.toPrintableString());
- }
- }
- break;
+ globalClusteringCoefficient = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(lp));
- case "hash":
- System.out.println(DataSetUtils.checksumHashCode(lcc));
+ averageClusteringCoefficient = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(lp));
break;
- case "csv":
- String filename = parameters.get("output_filename");
+ case UNDIRECTED:
+ result = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(lp));
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+ globalClusteringCoefficient = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(lp));
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
- env.execute("Clustering Coefficient");
+ averageClusteringCoefficient = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(lp));
break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid output type"));
}
+ }
- System.out.println(gcc.getResult());
- System.out.println(acc.getResult());
+ @Override
+ public void hash(String executionName) throws Exception {
+ super.hash(executionName);
+ printAnalytics();
+ }
- JobExecutionResult result = env.getLastJobExecutionResult();
+ @Override
+ public void print(String executionName) throws Exception {
+ super.print(executionName);
+ printAnalytics();
+ }
+
+ @Override
+ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+ super.writeCSV(filename, lineDelimiter, fieldDelimiter);
+ printAnalytics();
+ }
- NumberFormat nf = NumberFormat.getInstance();
- System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ private void printAnalytics() {
+ System.out.println(globalClusteringCoefficient.getResult().toPrintableString());
+ System.out.println(averageClusteringCoefficient.getResult().toPrintableString());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
new file mode 100644
index 0000000..32263cf
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+
+import java.util.List;
+
+/**
+ * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}.
+ *
+ * The gather-sum-apply implementation is used because scatter-gather does not
+ * handle object reuse (see FLINK-5891).
+ */
+public class ConnectedComponents<K extends Comparable<K>, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, CSV, Hash, Print {
+
+ private DataSet<Vertex<K, K>> components;
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "ConnectedComponents";
+ }
+
+ @Override
+ public String getLongDescription() {
+ return "ConnectedComponents";
+ }
+
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ components = graph
+ .mapVertices(new MapVertices<K, VV>())
+ .run(new GSAConnectedComponents<K, K, EV>(Integer.MAX_VALUE));
+ }
+
+ @Override
+ public void hash(String executionName) throws Exception {
+ Checksum checksum = new ChecksumHashCode<Vertex<K, K>>()
+ .run(components)
+ .execute(executionName);
+
+ System.out.println(checksum);
+ }
+
+ @Override
+ public void print(String executionName) throws Exception {
+ Collect<Vertex<K, K>> collector = new Collect<>();
+
+ // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
+ List<Vertex<K, K>> records = collector.run(components).execute(executionName);
+
+ for (Vertex<K, K> result : records) {
+ System.out.println(result);
+ }
+ }
+
+ @Override
+ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+ components
+ .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+ .name("CSV: " + filename);
+ }
+
+ private static final class MapVertices<T, VT>
+ implements MapFunction<Vertex<T, VT>, T> {
+ @Override
+ public T map(Vertex<T, VT> value) throws Exception {
+ return value.f0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
new file mode 100644
index 0000000..85f32c3
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.util.List;
+
+/**
+ * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s.
+ */
+public class EdgeList<K, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, CSV, Hash, Print {
+
+ private DataSet<Edge<K, EV>> edges;
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "the edge list";
+ }
+
+ @Override
+ public String getLongDescription() {
+ return "Pass-through of the graph's edge list.";
+ }
+
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ edges = graph
+ .getEdges();
+ }
+
+ @Override
+ public void hash(String executionName) throws Exception {
+ Checksum checksum = new ChecksumHashCode<Edge<K, EV>>()
+ .run(edges)
+ .execute(executionName);
+
+ System.out.println(checksum);
+ }
+
+ @Override
+ public void print(String executionName) throws Exception {
+ Collect<Edge<K, EV>> collector = new Collect<>();
+
+ // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
+ List<Edge<K, EV>> records = collector.run(edges).execute(executionName);
+
+ for (Edge<K, EV> result : records) {
+ System.out.println(result);
+ }
+
+ }
+
+ @Override
+ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+ edges
+ .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+ .name("CSV: " + filename);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
deleted file mode 100644
index c2abbf7..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.drivers;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Generate an RMat graph for Graph 500.
- *
- * Note that this does not yet implement permutation of vertex labels or edges.
- *
- * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
- */
-public class Graph500 {
-
- private static final int DEFAULT_SCALE = 10;
-
- private static final int DEFAULT_EDGE_FACTOR = 16;
-
- private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
- private static String getUsage(String message) {
- return new StrBuilder()
- .appendNewLine()
- .appendln("A Graph500 generator using the Recursive Matrix (RMat) graph generator.")
- .appendNewLine()
- .appendln(WordUtils.wrap("The graph matrix contains 2^scale vertices although not every vertex will" +
- " be represented in an edge. The number of edges is edge_factor * 2^scale edges" +
- " although some edges may be duplicates.", 80))
- .appendNewLine()
- .appendln("Note: this does not yet implement permutation of vertex labels or edges.")
- .appendNewLine()
- .appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv [options]>")
- .appendNewLine()
- .appendln("options:")
- .appendln(" --output print")
- .appendln(" --output hash")
- .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
- .appendNewLine()
- .appendln("Usage error: " + message)
- .toString();
- }
-
- public static void main(String[] args) throws Exception {
- // Set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- ParameterTool parameters = ParameterTool.fromArgs(args);
- env.getConfig().setGlobalJobParameters(parameters);
-
- if (! parameters.has("directed")) {
- throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
- }
- boolean directed = parameters.getBoolean("directed");
-
- if (! parameters.has("simplify")) {
- throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'"));
- }
- boolean simplify = parameters.getBoolean("simplify");
-
-
- // Generate RMat graph
- int scale = parameters.getInt("scale", DEFAULT_SCALE);
- int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
- RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
- long vertexCount = 1L << scale;
- long edgeCount = vertexCount * edgeFactor;
-
- boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
- Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .generate();
-
- if (directed) {
- if (simplify) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
- }
- } else {
- if (simplify) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
- } else {
- graph = graph.getUndirected();
- }
- }
-
- DataSet<Tuple2<LongValue, LongValue>> edges = graph
- .getEdges()
- .project(0, 1);
-
- // Print, hash, or write RMat graph to disk
- switch (parameters.get("output", "")) {
- case "print":
- System.out.println();
- edges.print();
- break;
-
- case "hash":
- System.out.println();
- System.out.println(DataSetUtils.checksumHashCode(edges));
- break;
-
- case "csv":
- String filename = parameters.getRequired("output_filename");
-
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- edges.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
- env.execute("Graph500");
- break;
- default:
- throw new ProgramParametrizationException(getUsage("invalid output type"));
- }
-
- JobExecutionResult result = env.getLastJobExecutionResult();
-
- NumberFormat nf = NumberFormat.getInstance();
- System.out.println();
- System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 9b246df..cc5a894 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -18,224 +18,109 @@
package org.apache.flink.graph.drivers;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.CopyableValue;
/**
- * Computes vertex and edge metrics on a directed or undirected graph.
+ * Driver for directed and undirected graph metrics analytics.
*
* @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
* @see org.apache.flink.graph.library.metric.directed.VertexMetrics
* @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
* @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
*/
-public class GraphMetrics {
+public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, Hash, Print {
- private static final int DEFAULT_SCALE = 10;
+ private static final String DIRECTED = "directed";
- private static final int DEFAULT_EDGE_FACTOR = 16;
+ private static final String UNDIRECTED = "undirected";
- private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+ private ChoiceParameter order = new ChoiceParameter(this, "order")
+ .addChoices(DIRECTED, UNDIRECTED);
- private static String getUsage(String message) {
+ private GraphAnalytic<K, VV, EV, ? extends PrintableResult> vertexMetrics;
+
+ private GraphAnalytic<K, VV, EV, ? extends PrintableResult> edgeMetrics;
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "compute vertex and edge metrics";
+ }
+
+ @Override
+ public String getLongDescription() {
return new StrBuilder()
+ .appendln("Computes metrics on a directed or undirected graph.")
.appendNewLine()
- .appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
- .appendNewLine()
- .appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>")
+ .appendln("Vertex metrics:")
+ .appendln("- number of vertices")
+ .appendln("- number of edges")
+ .appendln("- number of unidirectional edges (directed only)")
+ .appendln("- number of bidirectional edges (directed only)")
+ .appendln("- average degree")
+ .appendln("- number of triplets")
+ .appendln("- maximum degree")
+ .appendln("- maximum out degree (directed only)")
+ .appendln("- maximum in degree (directed only)")
+ .appendln("- maximum number of triplets")
.appendNewLine()
- .appendln("options:")
- .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
- .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
- .appendNewLine()
- .appendln("Usage error: " + message)
+ .appendln("Edge metrics:")
+ .appendln("- number of triangle triplets")
+ .appendln("- number of rectangle triplets")
+ .appendln("- maximum number of triangle triplets")
+ .append("- maximum number of rectangle triplets")
.toString();
}
- public static void main(String[] args) throws Exception {
- // Set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ switch (order.getValue()) {
+ case DIRECTED:
+ vertexMetrics = graph
+ .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>());
- ParameterTool parameters = ParameterTool.fromArgs(args);
- env.getConfig().setGlobalJobParameters(parameters);
+ edgeMetrics = graph
+ .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>());
+ break;
- if (! parameters.has("directed")) {
- throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
- }
- boolean directedAlgorithm = parameters.getBoolean("directed");
-
- GraphAnalytic vm;
- GraphAnalytic em;
-
- switch (parameters.get("input", "")) {
- case "csv": {
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- GraphCsvReader reader = Graph
- .fromCsvReader(parameters.getRequired("input_filename"), env)
- .ignoreCommentsEdges("#")
- .lineDelimiterEdges(lineDelimiter)
- .fieldDelimiterEdges(fieldDelimiter);
-
- switch (parameters.get("type", "")) {
- case "integer": {
- Graph<LongValue, NullValue, NullValue> graph = reader
- .keyType(LongValue.class);
-
- if (directedAlgorithm) {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
- }
-
- vm = graph
- .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
- em = graph
- .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
- } else {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
- }
-
- vm = graph
- .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
- em = graph
- .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
- }
- } break;
-
- case "string": {
- Graph<StringValue, NullValue, NullValue> graph = reader
- .keyType(StringValue.class);
-
- if (directedAlgorithm) {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
- }
-
- vm = graph
- .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
- em = graph
- .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
- } else {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
- }
-
- vm = graph
- .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
- em = graph
- .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
- }
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid CSV type"));
- }
- } break;
-
- case "rmat": {
- int scale = parameters.getInt("scale", DEFAULT_SCALE);
- int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
- RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
- long vertexCount = 1L << scale;
- long edgeCount = vertexCount * edgeFactor;
-
-
- Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .generate();
-
- if (directedAlgorithm) {
- if (scale > 32) {
- Graph<LongValue, NullValue, NullValue> newGraph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-
- vm = newGraph
- .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
- em = newGraph
- .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
- } else {
- Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
-
- vm = newGraph
- .run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
- em = newGraph
- .run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
- }
- } else {
- boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
- if (scale > 32) {
- Graph<LongValue, NullValue, NullValue> newGraph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
- vm = newGraph
- .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
- em = newGraph
- .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
- } else {
- Graph<IntValue, NullValue, NullValue> newGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
-
- vm = newGraph
- .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
- em = newGraph
- .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
- }
- }
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid input type"));
+ case UNDIRECTED:
+ vertexMetrics = graph
+ .run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>());
+
+ edgeMetrics = graph
+ .run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
+ break;
}
+ }
- env.execute("Graph Metrics");
+ @Override
+ public void hash(String executionName) throws Exception {
+ print(executionName);
+ }
- System.out.println();
- System.out.print("Vertex metrics:\n ");
- System.out.println(vm.getResult().toString().replace(";", "\n "));
- System.out.println();
- System.out.print("Edge metrics:\n ");
- System.out.println(em.getResult().toString().replace(";", "\n "));
+ @Override
+ public void print(String executionName) throws Exception {
+ vertexMetrics.execute(executionName);
- JobExecutionResult result = env.getLastJobExecutionResult();
+ System.out.print("Vertex metrics:\n ");
+ System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n "));
- NumberFormat nf = NumberFormat.getInstance();
System.out.println();
- System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ System.out.print("Edge metrics:\n ");
+ System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n "));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index db27f0e..6081fea 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -18,177 +18,51 @@
package org.apache.flink.graph.drivers;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
/**
- * Driver for the library implementation of HITS (Hubs and Authorities).
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * hub and authority scores for each vertex.
- *
- * @see org.apache.flink.graph.library.link_analysis.HITS
+ * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}.
*/
-public class HITS {
+public class HITS<K, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
private static final int DEFAULT_ITERATIONS = 10;
- private static final int DEFAULT_SCALE = 10;
+ private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
- private static final int DEFAULT_EDGE_FACTOR = 16;
-
- private static String getUsage(String message) {
- return new StrBuilder()
- .appendNewLine()
- .appendln(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" +
- " scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
- " and good \"authorities\" are linked from good \"hubs\".", 80))
- .appendNewLine()
- .appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>")
- .appendNewLine()
- .appendln("options:")
- .appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
- .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
- .appendNewLine()
- .appendln(" --output print")
- .appendln(" --output hash")
- .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
- .appendNewLine()
- .appendln("Usage error: " + message)
- .toString();
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
}
- public static void main(String[] args) throws Exception {
- // Set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- ParameterTool parameters = ParameterTool.fromArgs(args);
- env.getConfig().setGlobalJobParameters(parameters);
-
- int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
-
- DataSet hits;
-
- switch (parameters.get("input", "")) {
- case "csv": {
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- GraphCsvReader reader = Graph
- .fromCsvReader(parameters.getRequired("input_filename"), env)
- .ignoreCommentsEdges("#")
- .lineDelimiterEdges(lineDelimiter)
- .fieldDelimiterEdges(fieldDelimiter);
-
- switch (parameters.get("type", "")) {
- case "integer": {
- hits = reader
- .keyType(LongValue.class)
- .run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
- } break;
-
- case "string": {
- hits = reader
- .keyType(StringValue.class)
- .run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations));
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid CSV type"));
- }
- } break;
-
- case "rmat": {
- int scale = parameters.getInt("scale", DEFAULT_SCALE);
- int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
- RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
- long vertexCount = 1L << scale;
- long edgeCount = vertexCount * edgeFactor;
-
- Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .generate();
-
- if (scale > 32) {
- hits = graph
- .run(new Simplify<LongValue, NullValue, NullValue>())
- .run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
- } else {
- hits = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
- .run(new Simplify<IntValue, NullValue, NullValue>())
- .run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
- }
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid input type"));
- }
-
- switch (parameters.get("output", "")) {
- case "print":
- System.out.println();
- for (Object e: hits.collect()) {
- System.out.println(((Result)e).toPrintableString());
- }
- break;
-
- case "hash":
- System.out.println();
- System.out.println(DataSetUtils.checksumHashCode(hits));
- break;
-
- case "csv":
- String filename = parameters.getRequired("output_filename");
-
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
- env.execute("HITS");
- break;
- default:
- throw new ProgramParametrizationException(getUsage("invalid output type"));
- }
+ @Override
+ public String getShortDescription() {
+ return "score vertices as hubs and authorities";
+ }
- JobExecutionResult result = env.getLastJobExecutionResult();
+ @Override
+ public String getLongDescription() {
+ return WordUtils.wrap(new StrBuilder()
+ .appendln("Hyperlink-Induced Topic Search computes two interdependent scores for " +
+ "each vertex in a directed graph. A good \"hub\" links to good \"authorities\" " +
+ "and good \"authorities\" are linked to from good \"hubs\".")
+ .appendNewLine()
+ .append("The result contains the vertex ID, hub score, and authority score.")
+ .toString(), 80);
+ }
- NumberFormat nf = NumberFormat.getInstance();
- System.out.println();
- System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ result = graph
+ .run(new org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>(
+ iterationConvergence.getValue().iterations,
+ iterationConvergence.getValue().convergenceThreshold));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 09479a6..1c836ea 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -18,211 +18,57 @@
package org.apache.flink.graph.drivers;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.types.CopyableValue;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Driver for the library implementation of Jaccard Index.
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * all non-zero Jaccard Index similarity scores between vertices.
- *
- * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
*/
-public class JaccardIndex {
+public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
- private static final int DEFAULT_SCALE = 10;
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
- private static final int DEFAULT_EDGE_FACTOR = 16;
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
- private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+ @Override
+ public String getShortDescription() {
+ return "similarity score as fraction of common neighbors";
+ }
- private static String getUsage(String message) {
- return new StrBuilder()
- .appendNewLine()
- .appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
- " neighborhoods and is computed as the number of shared neighbors divided by the number of" +
- " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
- " shared).", 80))
- .appendNewLine()
- .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
- " number of shared neighbors, and the number of distinct neighbors.", 80))
- .appendNewLine()
- .appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>")
- .appendNewLine()
- .appendln("options:")
- .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
- .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
- .appendNewLine()
- .appendln(" --output print")
- .appendln(" --output hash")
- .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+ @Override
+ public String getLongDescription() {
+ return WordUtils.wrap(new StrBuilder()
+ .appendln("Jaccard Index measures the similarity between vertex neighborhoods and " +
+ "is computed as the number of shared neighbors divided by the number of " +
+ "distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all " +
+ "neighbors are shared).")
.appendNewLine()
- .appendln("Usage error: " + message)
- .toString();
+ .append("The result contains two vertex IDs, the number of shared neighbors, and " +
+ "the number of distinct neighbors.")
+ .toString(), 80);
}
- public static void main(String[] args) throws Exception {
- // Set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
-
- ParameterTool parameters = ParameterTool.fromArgs(args);
- env.getConfig().setGlobalJobParameters(parameters);
-
- int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
- DataSet ji;
-
- switch (parameters.get("input", "")) {
- case "csv": {
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- GraphCsvReader reader = Graph
- .fromCsvReader(parameters.getRequired("input_filename"), env)
- .ignoreCommentsEdges("#")
- .lineDelimiterEdges(lineDelimiter)
- .fieldDelimiterEdges(fieldDelimiter);
-
- switch (parameters.get("type", "")) {
- case "integer": {
- Graph<LongValue, NullValue, NullValue> graph = reader
- .keyType(LongValue.class);
-
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
- .setParallelism(little_parallelism));
- }
-
- ji = graph
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } break;
-
- case "string": {
- Graph<StringValue, NullValue, NullValue> graph = reader
- .keyType(StringValue.class);
-
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
- .setParallelism(little_parallelism));
- }
-
- ji = graph
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid CSV type"));
- }
- } break;
-
- case "rmat": {
- int scale = parameters.getInt("scale", DEFAULT_SCALE);
- int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
- RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
- long vertexCount = 1L << scale;
- long edgeCount = vertexCount * edgeFactor;
-
- Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setParallelism(little_parallelism)
- .generate();
-
- boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
- if (scale > 32) {
- ji = graph
- .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
- .setParallelism(little_parallelism))
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- ji = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
- .setParallelism(little_parallelism))
- .run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
- .setParallelism(little_parallelism))
- .run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid input type"));
- }
-
- switch (parameters.get("output", "")) {
- case "print":
- System.out.println();
- for (Object e: ji.collect()) {
- Result result = (Result)e;
- System.out.println(result.toPrintableString());
- }
- break;
-
- case "hash":
- System.out.println();
- System.out.println(DataSetUtils.checksumHashCode(ji));
- break;
-
- case "csv":
- String filename = parameters.getRequired("output_filename");
-
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
- env.execute("Jaccard Index");
- break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid output type"));
- }
-
- JobExecutionResult result = env.getLastJobExecutionResult();
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ int lp = littleParallelism.getValue().intValue();
- NumberFormat nf = NumberFormat.getInstance();
- System.out.println();
- System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ result = graph
+ .run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
+ .setLittleParallelism(lp));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
new file mode 100644
index 0000000..8cef077
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence;
+import org.apache.flink.graph.library.link_analysis.PageRank.Result;
+
+/**
+ * @see org.apache.flink.graph.library.link_analysis.PageRank
+ */
+public class PageRank<K, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
+
+ private static final int DEFAULT_ITERATIONS = 10;
+
+ private DoubleParameter dampingFactor = new DoubleParameter(this, "damping_factor")
+ .setDefaultValue(0.85)
+ .setMinimumValue(0.0, false)
+ .setMaximumValue(1.0, false);
+
+ private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "score vertices by the number and quality of incoming links";
+ }
+
+ @Override
+ public String getLongDescription() {
+ return new StrBuilder()
+ .appendln("PageRank computes a per-vertex score which is the sum of PageRank scores " +
+ "transmitted over in-edges. Each vertex's score is divided evenly among " +
+ "out-edges. High-scoring vertices are linked to by other high-scoring vertices.")
+ .appendNewLine()
+ .append("The result contains the vertex ID and PageRank score.")
+ .toString();
+ }
+
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ result = graph
+ .run(new org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>(
+ dampingFactor.getValue(),
+ iterationConvergence.getValue().iterations,
+ iterationConvergence.getValue().convergenceThreshold));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
new file mode 100644
index 0000000..98bdfc5
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.util.List;
+
+/**
+ * A base driver storing a single result {@link DataSet} with values
+ * implementing {@link PrintableResult}.
+ *
+ * @param <R> algorithm's result type
+ */
+public abstract class SimpleDriver<R extends PrintableResult>
+extends ParameterizedBase {
+
+ protected DataSet<? extends R> result;
+
+ public void hash(String executionName) throws Exception {
+ Checksum checksum = new ChecksumHashCode<R>()
+ .run((DataSet<R>) result)
+ .execute(executionName);
+
+ System.out.println(checksum);
+ }
+
+ public void print(String executionName) throws Exception {
+ Collect<R> collector = new Collect<>();
+
+ // Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
+ List<R> records = collector.run((DataSet<R>) result).execute(executionName);
+
+ for (R result : records) {
+ System.out.println(result.toPrintableString());
+ }
+ }
+
+ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+ result
+ .writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+ .name("CSV: " + filename);
+ }
+}