You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/31 16:35:28 UTC
[5/6] flink git commit: [FLINK-5913] [gelly] Example drivers
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 93a96c4..ca0c167 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -18,315 +18,125 @@
package org.apache.flink.graph.drivers;
-import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.CopyableValue;
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
/**
- * Driver for the library implementation of Triangle Listing.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then lists
- * all triangles.
+ * Driver for directed and undirected triangle listing algorithm and analytic.
*
* @see org.apache.flink.graph.library.clustering.directed.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.directed.TriadicCensus
* @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus
*/
-public class TriangleListing {
-
- private static final int DEFAULT_SCALE = 10;
-
- private static final int DEFAULT_EDGE_FACTOR = 16;
+public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends SimpleDriver<PrintableResult>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
- private static final boolean DEFAULT_TRIADIC_CENSUS = true;
-
- private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
- private static String getUsage(String message) {
- return new StrBuilder()
- .appendNewLine()
- .appendln(WordUtils.wrap("Lists all triangles in a graph.", 80))
- .appendNewLine()
- .appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
- " for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80))
- .appendNewLine()
- .appendln("usage: TriangleListing --directed <true | false> [--triadic_census <true | false>] --input <csv | rmat> --output <print | hash | csv>")
- .appendNewLine()
- .appendln("options:")
- .appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
- .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
- .appendNewLine()
- .appendln(" --output print")
- .appendln(" --output hash")
- .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
- .appendNewLine()
- .appendln("Usage error: " + message)
- .toString();
- }
+ private static final String DIRECTED = "directed";
- public static void main(String[] args) throws Exception {
- // Set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableObjectReuse();
+ private static final String UNDIRECTED = "undirected";
- ParameterTool parameters = ParameterTool.fromArgs(args);
- env.getConfig().setGlobalJobParameters(parameters);
+ private ChoiceParameter order = new ChoiceParameter(this, "order")
+ .addChoices(DIRECTED, UNDIRECTED);
- if (! parameters.has("directed")) {
- throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
- }
- boolean directedAlgorithm = parameters.getBoolean("directed");
+ private BooleanParameter sortTriangleVertices = new BooleanParameter(this, "sort_triangle_vertices");
- int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
- boolean triadic_census = parameters.getBoolean("triadic_census", DEFAULT_TRIADIC_CENSUS);
+ private BooleanParameter computeTriadicCensus = new BooleanParameter(this, "triadic_census");
- GraphAnalytic tc = null;
- DataSet tl;
-
- switch (parameters.get("input", "")) {
- case "csv": {
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
- GraphCsvReader reader = Graph
- .fromCsvReader(parameters.getRequired("input_filename"), env)
- .ignoreCommentsEdges("#")
- .lineDelimiterEdges(lineDelimiter)
- .fieldDelimiterEdges(fieldDelimiter);
-
- switch (parameters.get("type", "")) {
- case "integer": {
- Graph<LongValue, NullValue, NullValue> graph = reader
- .keyType(LongValue.class);
-
- if (directedAlgorithm) {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
- }
-
- if (triadic_census) {
- tc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = graph
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
- .setParallelism(little_parallelism));
- }
-
- if (triadic_census) {
- tc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- } break;
-
- case "string": {
- Graph<StringValue, NullValue, NullValue> graph = reader
- .keyType(StringValue.class);
-
- if (directedAlgorithm) {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
- }
-
- if (triadic_census) {
- tc = graph
- .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = graph
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- if (parameters.getBoolean("simplify", false)) {
- graph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
- .setParallelism(little_parallelism));
- }
-
- if (triadic_census) {
- tc = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = graph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- } break;
-
- default:
- throw new ProgramParametrizationException(getUsage("invalid CSV type"));
- }
+ private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+ .setDefaultValue(PARALLELISM_DEFAULT);
+ private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus;
- } break;
-
- case "rmat": {
- int scale = parameters.getInt("scale", DEFAULT_SCALE);
- int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
- RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
- long vertexCount = 1L << scale;
- long edgeCount = vertexCount * edgeFactor;
-
- Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .generate();
-
- if (directedAlgorithm) {
- if (scale > 32) {
- Graph<LongValue, NullValue, NullValue> simpleGraph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
-
- if (triadic_census) {
- tc = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- Graph<LongValue, NullValue, NullValue> simpleGraph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
- .setParallelism(little_parallelism));
-
- if (triadic_census) {
- tc = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- } else {
- boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
- if (scale > 32) {
- Graph<LongValue, NullValue, NullValue> simpleGraph = graph
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
- .setParallelism(little_parallelism));
+ @Override
+ public String getShortDescription() {
+ return "list triangles";
+ }
- if (triadic_census) {
- tc = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- } else {
- Graph<IntValue, NullValue, NullValue> simpleGraph = graph
- .run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
- .setParallelism(little_parallelism))
- .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
- .setParallelism(little_parallelism));
+ @Override
+ public String getLongDescription() {
+ return WordUtils.wrap(new StrBuilder()
+ .appendln("List all triangles graph.")
+ .appendNewLine()
+ .append("The algorithm result contains three vertex IDs. For the directed algorithm " +
+ "the result contains an additional bitmask indicating the presence of the six " +
+ "potential connecting edges.")
+ .toString(), 80);
+ }
- if (triadic_census) {
- tc = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
- tl = simpleGraph
- .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()
- .setLittleParallelism(little_parallelism));
- }
+ @Override
+ public void plan(Graph<K, VV, EV> graph) throws Exception {
+ int lp = littleParallelism.getValue().intValue();
+
+ switch (order.getValue()) {
+ case DIRECTED:
+ result = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
+ .setSortTriangleVertices(sortTriangleVertices.getValue())
+ .setLittleParallelism(lp));
+
+ if (computeTriadicCensus.getValue()) {
+ triadicCensus = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
+ .setLittleParallelism(lp));
}
- } break;
+ break;
- default:
- throw new ProgramParametrizationException(getUsage("invalid input type"));
- }
+ case UNDIRECTED:
+ result = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
+ .setSortTriangleVertices(sortTriangleVertices.getValue())
+ .setLittleParallelism(lp));
- switch (parameters.get("output", "")) {
- case "print":
- System.out.println();
- if (directedAlgorithm) {
- for (Object e: tl.collect()) {
- org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
- (org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
- System.out.println(result.toPrintableString());
- }
- } else {
- tl.print();
+ if (computeTriadicCensus.getValue()) {
+ triadicCensus = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
+ .setLittleParallelism(lp));
}
break;
+ }
+ }
- case "hash":
- System.out.println();
- System.out.println(DataSetUtils.checksumHashCode(tl));
- break;
-
- case "csv":
- String filename = parameters.getRequired("output_filename");
-
- String lineDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
- String fieldDelimiter = StringEscapeUtils.unescapeJava(
- parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+ @Override
+ public void hash(String executionName) throws Exception {
+ super.hash(executionName);
+ printAnalytics();
+ }
- tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+ @Override
+ public void print(String executionName) throws Exception {
+ super.print(executionName);
+ printAnalytics();
+ }
- env.execute();
- break;
- default:
- throw new ProgramParametrizationException(getUsage("invalid output type"));
- }
+ @Override
+ public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+ super.writeCSV(filename, lineDelimiter, fieldDelimiter);
+ printAnalytics();
+ }
- if (tc != null) {
+ private void printAnalytics() {
+ if (computeTriadicCensus.getValue()) {
System.out.print("Triadic census:\n ");
- System.out.println(tc.getResult().toString().replace(";", "\n "));
+ System.out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n "));
}
-
- JobExecutionResult result = env.getLastJobExecutionResult();
-
- NumberFormat nf = NumberFormat.getInstance();
- System.out.println();
- System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
new file mode 100644
index 0000000..e9d648a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence.Value;
+
+/**
+ * Iterative algorithms which converge can be terminated with a maximum number
+ * of iterations or a convergence threshold which stops computation when the
+ * total change in scores is below a given delta.
+ *
+ * If the command-line configuration specifies neither a number of iterations
+ * nor a convergence threshold then a default number of iterations is used
+ * with an infinite convergence threshold. Otherwise, when either value is
+ * configured then an unset value is set to infinity.
+ */
+public class IterationConvergence
+implements Parameter<Value> {
+
+ private final int defaultIterations;
+
+ private final Value value = new Value();
+
+ /**
+ * Add this parameter to the list of parameters stored by owner.
+ *
+ * @param owner the {@link Parameterized} using this {@link Parameter}
+ * @param defaultIterations the default number of iterations if neither
+ * the number of iterations nor the convergence
+ * threshold are specified
+ */
+ public IterationConvergence(ParameterizedBase owner, int defaultIterations) {
+ owner.addParameter(this);
+ this.defaultIterations = defaultIterations;
+ }
+
+ @Override
+ public String getUsage() {
+ return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD]";
+ }
+
+ @Override
+ public void configure(ParameterTool parameterTool) {
+ if (!parameterTool.has("iterations") && !parameterTool.has("convergence_threshold")) {
+ // no configuration so use default iterations and maximum threshold
+ value.iterations = defaultIterations;
+ value.convergenceThreshold = Double.MAX_VALUE;
+ } else {
+ // use configured values and maximum default for unset values
+ value.iterations = parameterTool.getInt("iterations", Integer.MAX_VALUE);
+ Util.checkParameter(value.iterations > 0,
+ "iterations must be greater than zero");
+
+ value.convergenceThreshold = parameterTool.getDouble("convergence_threshold", Double.MAX_VALUE);
+ Util.checkParameter(value.convergenceThreshold > 0,
+ "convergence threshold must be greater than zero");
+ }
+ }
+
+ @Override
+ public Value getValue() {
+ return value;
+ }
+
+ /**
+ * Encapsulate the number of iterations and the convergence threshold.
+ */
+ public static class Value {
+ public int iterations;
+ public double convergenceThreshold;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
deleted file mode 100644
index 6651739..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ConnectedComponents.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in {@link org.apache.flink.graph.library}.
- *
- * In particular, this example uses the {@link GSAConnectedComponents}
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents <edge path> <result path>
- * <number of iterations> </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link ConnectedComponentsDefaultData}
- */
-public class ConnectedComponents implements ProgramDescription {
-
- @SuppressWarnings("serial")
- public static void main(String [] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- return value;
- }
- }, env);
-
- DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
- .run(new GSAConnectedComponents<Long, Long, NullValue>(maxIterations));
-
- // emit result
- if (fileOutput) {
- verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("Connected Components Example");
- } else {
- verticesWithMinIds.print();
- }
- }
-
- @Override
- public String getDescription() {
- return "Connected Components Example";
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS;
-
- private static boolean parseParameters(String [] args) {
- if(args.length > 0) {
- if(args.length != 3) {
- System.err.println("Usage ConnectedComponents <edge path> <output path> " +
- "<num iterations>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
-
- } else {
- System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
- System.out.println("Provide parameters to read input data from files.");
- System.out.println("Usage ConnectedComponents <edge path> <output path> " +
- "<num iterations>");
- }
-
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-
- if(fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .ignoreComments("#")
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
- @Override
- public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
- return new Edge<>(value.f0, value.f1, NullValue.getInstance());
- }
- });
- } else {
- return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
index 35f07b0..1cd3549 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSASingleSourceShortestPaths.java
@@ -114,7 +114,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
public Double gather(Neighbor<Double, Double> neighbor) {
return neighbor.getNeighborValue() + neighbor.getEdgeValue();
}
- };
+ }
@SuppressWarnings("serial")
private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
@@ -122,7 +122,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
public Double sum(Double newValue, Double currentValue) {
return Math.min(newValue, currentValue);
}
- };
+ }
@SuppressWarnings("serial")
private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
new file mode 100644
index 0000000..ae92943
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/parameter/IterationConvergenceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.parameter;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IterationConvergenceTest
+extends ParameterTestBase {
+
+ private IterationConvergence parameter;
+
+ @Before
+ public void setup() {
+ super.setup();
+
+ parameter = new IterationConvergence(owner, 10);
+ }
+
+ @Test
+ public void testWithIterations() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{"--iterations", "42"}));
+ Assert.assertEquals(42, parameter.getValue().iterations);
+ Assert.assertEquals(Double.MAX_VALUE, parameter.getValue().convergenceThreshold, 0.000001);
+ }
+
+ @Test
+ public void testWithConvergenceThreshold() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{"--convergence_threshold", "42"}));
+ Assert.assertEquals(Integer.MAX_VALUE, parameter.getValue().iterations);
+ Assert.assertEquals(42.0, parameter.getValue().convergenceThreshold, 0.000001);
+ }
+
+ @Test
+ public void testWithBoth() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{"--iterations", "42", "--convergence_threshold", "42"}));
+ Assert.assertEquals(42, parameter.getValue().iterations);
+ Assert.assertEquals(42.0, parameter.getValue().convergenceThreshold, 0.000001);
+ }
+
+ @Test
+ public void testWithNeither() {
+ parameter.configure(ParameterTool.fromArgs(new String[]{}));
+ Assert.assertEquals(10, parameter.getValue().iterations);
+ Assert.assertEquals(Double.MAX_VALUE, parameter.getValue().convergenceThreshold, 0.000001);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
deleted file mode 100644
index d0de8dc..0000000
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.examples;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.examples.ConnectedComponents;
-import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
-
- private String edgesPath;
-
- private String resultPath;
-
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- public ConnectedComponentsITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Before
- public void before() throws Exception {
- resultPath = tempFolder.newFile().toURI().toString();
-
- File edgesFile = tempFolder.newFile();
- Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
- edgesPath = edgesFile.toURI().toString();
- }
-
- @Test
- public void testConnectedComponentsExample() throws Exception {
- ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
- expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
- }
-
- @After
- public void after() throws Exception {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index cbbfb02..71baaa9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -529,7 +529,13 @@ public class Graph<K, VV, EV> {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
- TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null);
+ TypeInformation<NV> valueType;
+
+ if (mapper instanceof ResultTypeQueryable) {
+ valueType = ((ResultTypeQueryable) mapper).getProducedType();
+ } else {
+ valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, vertices.getType(), null);
+ }
TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
Vertex.class, keyType, valueType);
@@ -573,7 +579,13 @@ public class Graph<K, VV, EV> {
TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
- TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null);
+ TypeInformation<NV> valueType;
+
+ if (mapper instanceof ResultTypeQueryable) {
+ valueType = ((ResultTypeQueryable) mapper).getProducedType();
+ } else {
+ valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, edges.getType(), null);
+ }
TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
Edge.class, keyType, keyType, valueType);
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 3cd8f05..959b816 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
import org.apache.flink.types.NullValue;
/**
@@ -72,7 +73,7 @@ public class ConnectedComponents<K, VV extends Comparable<VV>, EV>
TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1);
Graph<K, VV, NullValue> undirectedGraph = graph
- .mapEdges(new NullValueEdgeMapper<K, EV>())
+ .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
.getUndirected();
return undirectedGraph.runScatterGatherIteration(
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index 327de73..1680f38 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
@@ -29,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
import org.apache.flink.types.NullValue;
/**
@@ -73,7 +74,7 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
TypeInformation<VV> valueTypeInfo = ((TupleTypeInfo<?>) graph.getVertices().getType()).getTypeAt(1);
Graph<K, VV, NullValue> undirectedGraph = graph
- .mapEdges(new NullValueEdgeMapper<K, EV>())
+ .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
.getUndirected();
return undirectedGraph.runGatherSumApplyIteration(
@@ -87,7 +88,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
// Connected Components UDFs
// --------------------------------------------------------------------------------------------
- @SuppressWarnings("serial")
private static final class GatherNeighborIds<VV extends Comparable<VV>>
extends GatherFunction<VV, NullValue, VV>
implements ResultTypeQueryable<VV> {
@@ -108,7 +108,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
}
}
- @SuppressWarnings("serial")
private static final class SelectMinId<VV extends Comparable<VV>>
extends SumFunction<VV, NullValue, VV>
implements ResultTypeQueryable<VV> {
@@ -129,7 +128,6 @@ public class GSAConnectedComponents<K, VV extends Comparable<VV>, EV>
}
}
- @SuppressWarnings("serial")
private static final class UpdateComponentId<K, VV extends Comparable<VV>>
extends ApplyFunction<K, VV, VV>
implements ResultTypeQueryable<VV> {
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 96e5afc..0064a68 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.graph.utils.GraphUtils.MapTo;
import org.apache.flink.types.NullValue;
import java.util.HashMap;
@@ -76,7 +77,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1);
// iteratively adopt the most frequent label among the neighbors of each vertex
return input
- .mapEdges(new NullValueEdgeMapper<K, EV>())
+ .mapEdges(new MapTo<Edge<K, EV>, NullValue>(NullValue.getInstance()))
.runScatterGatherIteration(
new SendNewLabelToNeighbors<K, VV>(valueType), new UpdateVertexLabel<K, VV>(), maxIterations)
.getVertices();
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 236272f..6fe753a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -35,9 +35,9 @@ import org.apache.flink.graph.EdgeOrder;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.TertiaryResult;
+import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 9aca8a4..eda5c1c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -32,8 +32,8 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index 57743e8..1dfa3ee 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -40,6 +40,8 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
import org.apache.flink.graph.library.link_analysis.PageRank.Result;
import org.apache.flink.graph.utils.GraphUtils;
@@ -500,7 +502,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
* @param <T> ID type
*/
public static class Result<T>
- extends Tuple2<T, DoubleValue> {
+ extends Tuple2<T, DoubleValue>
+ implements PrintableResult, UnaryResult<T> {
public static final int HASH_SEED = 0x4010af29;
private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
@@ -518,7 +521,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
return f1;
}
- public String toVerboseString() {
+ @Override
+ public String toPrintableString() {
return "Vertex ID: " + getVertexId0()
+ ", PageRank score: " + getPageRankScore();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 7d77541..6aaf9f2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -34,8 +34,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
-import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.BinaryResult;
+import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 3b36715..0c80e6d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
-import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.asm.result.BinaryResult;
+import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 2e0dffc..78fb378 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -20,7 +20,10 @@ package org.apache.flink.graph.utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.types.LongValue;
import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
@@ -62,7 +65,7 @@ public class GraphUtils {
* @param <O> output type
*/
public static class MapTo<I, O>
- implements MapFunction<I, O> {
+ implements MapFunction<I, O>, ResultTypeQueryable<O> {
private final O value;
/**
@@ -78,6 +81,11 @@ public class GraphUtils {
public O map(I o) throws Exception {
return value;
}
+
+ @Override
+ public TypeInformation<O> getProducedType() {
+ return (TypeInformation<O>)TypeExtractor.createTypeInfo(value.getClass());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
deleted file mode 100644
index 2bd4719..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-public class NullValueEdgeMapper<K, EV> implements MapFunction<Edge<K, EV>, NullValue> {
-
- private static final long serialVersionUID = 1L;
-
- public NullValue map(Edge<K, EV> edge) {
- return NullValue.getInstance();
- }
-}