You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/05/19 23:03:34 UTC
[05/10] flink git commit: [gelly] [refactoring] Removed Example end
string from all gelly examples
[gelly] [refactoring] Removed Example end string from all gelly examples
Added Algorithm end string to the library methods
This closes #625
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f4039dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f4039dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f4039dc
Branch: refs/heads/master
Commit: 8f4039dcb326a1af276ac4b93ffe5ea4da3e19bc
Parents: 1479973
Author: andralungu <lu...@gmail.com>
Authored: Sun Apr 26 20:09:04 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:03 2015 +0200
----------------------------------------------------------------------
.../flink/graph/example/CommunityDetection.java | 140 ++++++++++++
.../graph/example/ConnectedComponents.java | 141 ++++++++++++
.../example/ConnectedComponentsExample.java | 141 ------------
.../graph/example/EuclideanGraphExample.java | 210 ------------------
.../graph/example/EuclideanGraphWeighing.java | 210 ++++++++++++++++++
.../graph/example/GSAConnectedComponents.java | 176 +++++++++++++++
.../example/GSAConnectedComponentsExample.java | 176 ---------------
.../example/GSASingleSourceShortestPaths.java | 178 +++++++++++++++
.../GSASingleSourceShortestPathsExample.java | 178 ---------------
.../graph/example/JaccardSimilarityMeasure.java | 214 +++++++++++++++++++
.../JaccardSimilarityMeasureExample.java | 214 -------------------
.../flink/graph/example/LabelPropagation.java | 170 +++++++++++++++
.../graph/example/LabelPropagationExample.java | 170 ---------------
.../flink/graph/example/MusicProfiles.java | 4 +-
.../apache/flink/graph/example/PageRank.java | 149 +++++++++++++
.../flink/graph/example/PageRankExample.java | 149 -------------
.../SimpleCommunityDetectionExample.java | 140 ------------
.../example/SingleSourceShortestPaths.java | 133 ++++++++++++
.../SingleSourceShortestPathsExample.java | 133 ------------
.../example/utils/CommunityDetectionData.java | 65 ++++++
.../utils/ConnectedComponentsDefaultData.java | 52 +++++
.../utils/ConnectedComponentsExampleData.java | 52 -----
.../utils/EdgeWithLongIdNullValueParser.java | 33 ---
.../graph/example/utils/EuclideanGraphData.java | 10 +-
.../utils/SimpleCommunityDetectionData.java | 65 ------
.../utils/SingleSourceShortestPathsData.java | 4 +
.../library/CommunityDetectionAlgorithm.java | 172 +++++++++++++++
.../graph/library/ConnectedComponents.java | 88 --------
.../library/ConnectedComponentsAlgorithm.java | 88 ++++++++
.../flink/graph/library/LabelPropagation.java | 111 ----------
.../library/LabelPropagationAlgorithm.java | 114 ++++++++++
.../apache/flink/graph/library/PageRank.java | 100 ---------
.../flink/graph/library/PageRankAlgorithm.java | 104 +++++++++
.../graph/library/SimpleCommunityDetection.java | 172 ---------------
.../library/SingleSourceShortestPaths.java | 108 ----------
.../SingleSourceShortestPathsAlgorithm.java | 111 ++++++++++
.../flink/graph/test/GatherSumApplyITCase.java | 10 +-
.../test/example/CommunityDetectionITCase.java | 100 +++++++++
.../test/example/ConnectedComponentsITCase.java | 10 +-
...ctedComponentsWithRandomisedEdgesITCase.java | 4 +-
.../example/EuclideanGraphExampleITCase.java | 77 -------
.../example/EuclideanGraphWeighingITCase.java | 77 +++++++
.../JaccardSimilarityMeasureExampleITCase.java | 72 -------
.../example/JaccardSimilarityMeasureITCase.java | 72 +++++++
.../example/LabelPropagationExampleITCase.java | 143 -------------
.../test/example/LabelPropagationITCase.java | 143 +++++++++++++
.../example/SimpleCommunityDetectionITCase.java | 100 ---------
.../SingleSourceShortestPathsITCase.java | 4 +-
48 files changed, 2634 insertions(+), 2653 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
new file mode 100644
index 0000000..f9434d3
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetectionAlgorithm;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use the {@link org.apache.flink.graph.library.CommunityDetectionAlgorithm}
+ * library method:
+ * <ul>
+ * <li> with the edge data set given as a parameter
+ * <li> with default data
+ * </ul>
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
+ * 1-2 with weight 1.0 and 1-3 with weight 2.0.
+ *
+ * Usage <code>CommunityDetection <edge path> <result path>
+ * <number of iterations> <delta></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.CommunityDetectionData}
+ */
+public class CommunityDetection implements ProgramDescription {
+
+ @SuppressWarnings("serial")
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // set up the graph
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+ Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
+ new MapFunction<Long, Long>() {
+ @Override
+ public Long map(Long label) throws Exception {
+ return label;
+ }
+ }, env);
+
+ // the result is in the form of <vertexId, communityId>, where the communityId is the label
+ // which the vertex converged to
+ DataSet<Vertex<Long, Long>> communityVertices =
+ graph.run(new CommunityDetectionAlgorithm(maxIterations, delta)).getVertices();
+
+ // emit result
+ if (fileOutput) {
+ communityVertices.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ communityVertices.print();
+ }
+
+ env.execute("Executing Community Detection Example");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Community Detection";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+ private static Integer maxIterations = CommunityDetectionData.MAX_ITERATIONS;
+ private static Double delta = CommunityDetectionData.DELTA;
+
+ private static boolean parseParameters(String [] args) {
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage CommunityDetection <edge path> <output path> " +
+ "<num iterations> <delta>");
+ return false;
+ }
+
+ fileOutput = true;
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
+ delta = Double.parseDouble(args[3]);
+
+ } else {
+ System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("Usage CommunityDetection <edge path> <output path> " +
+ "<num iterations> <delta>");
+ }
+
+ return true;
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .ignoreComments("#")
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new Tuple3ToEdgeMap<Long, Double>());
+ } else {
+ return CommunityDetectionData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
new file mode 100644
index 0000000..b7c9045
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * library method:
+ * <ul>
+ * <li> with the edge data set given as a parameter
+ * <li> with default data
+ * </ul>
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 with and 1-3.
+ *
+ * Usage <code>ConnectedComponents <edge path> <result path>
+ * <number of iterations> </code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData}
+ */
+public class ConnectedComponents implements ProgramDescription {
+
+ @SuppressWarnings("serial")
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
+ @Override
+ public Long map(Long value) throws Exception {
+ return value;
+ }
+ }, env);
+
+ DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
+ .run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices();
+
+ // emit result
+ if (fileOutput) {
+ verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ verticesWithMinIds.print();
+ }
+
+ env.execute("Connected Components Example");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Connected Components Example";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+ private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS;
+
+ private static boolean parseParameters(String [] args) {
+ if(args.length > 0) {
+ if(args.length != 3) {
+ System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+ "<num iterations>");
+ return false;
+ }
+
+ fileOutput = true;
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
+
+ } else {
+ System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+ "<num iterations>");
+ }
+
+ return true;
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .ignoreComments("#")
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+ @Override
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ } else {
+ return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
deleted file mode 100644
index a185a70..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponents}
- * library method:
- * <ul>
- * <li> with the edge data set given as a parameter
- * <li> with default data
- * </ul>
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents <edge path> <result path>
- * <number of iterations> </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.ConnectedComponentsExampleData}
- */
-public class ConnectedComponentsExample implements ProgramDescription {
-
- @SuppressWarnings("serial")
- public static void main(String [] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
- @Override
- public Long map(Long value) throws Exception {
- return value;
- }
- }, env);
-
- DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
- .run(new ConnectedComponents(maxIterations)).getVertices();
-
- // emit result
- if (fileOutput) {
- verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
- } else {
- verticesWithMinIds.print();
- }
-
- env.execute("Connected Components Example");
- }
-
- @Override
- public String getDescription() {
- return "Connected Components Example";
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static Integer maxIterations = ConnectedComponentsExampleData.MAX_ITERATIONS;
-
- private static boolean parseParameters(String [] args) {
- if(args.length > 0) {
- if(args.length != 3) {
- System.err.println("Usage ConnectedComponents <edge path> <output path> " +
- "<num iterations>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
-
- } else {
- System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
- System.out.println("Provide parameters to read input data from files.");
- System.out.println("Usage ConnectedComponents <edge path> <output path> " +
- "<num iterations>");
- }
-
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-
- if(fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .ignoreComments("#")
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
- @Override
- public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
- return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
- }
- });
- } else {
- return ConnectedComponentsExampleData.getDefaultEdgeDataSet(env);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
deleted file mode 100644
index fa08084..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-
-import java.io.Serializable;
-
-/**
- * Given a directed, unweighted graph, with vertex values representing points in a plan,
- * return a weighted graph where the edge weights are equal to the Euclidean distance between the
- * src and the trg vertex values.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
- * the value being formed of two doubles separated by a comma.
- * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
- * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
- * Edges themselves are separated by newlines.
- * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
- * </ul>
- * </p>
- *
- * Usage <code>EuclideanGraphExample <vertex path> <edge path> <result path></code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
- */
-@SuppressWarnings("serial")
-public class EuclideanGraphExample implements ProgramDescription {
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
-
- DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
- Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
- // the edge value will be the Euclidean distance between its src and trg vertex
- DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
- .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
-
- @Override
- public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
- throws Exception {
-
- Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
- Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
-
- return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
- srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
- }
- });
-
- Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
- new MapFunction<Tuple2<Double, Double>, Double>() {
-
- @Override
- public Double map(Tuple2<Double, Double> distance) throws Exception {
- return distance.f1;
- }
- });
-
- // retrieve the edges from the final result
- DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
-
- // emit result
- if (fileOutput) {
- result.writeAsCsv(outputPath, "\n", ",");
- } else {
- result.print();
- }
-
- env.execute("Euclidean Graph Example");
- }
-
- @Override
- public String getDescription() {
- return "Weighing a graph by computing the Euclidean distance " +
- "between its vertices";
- }
-
- // *************************************************************************
- // DATA TYPES
- // *************************************************************************
-
- /**
- * A simple two-dimensional point.
- */
- public static class Point implements Serializable {
-
- public double x, y;
-
- public Point() {}
-
- public Point(double x, double y) {
- this.x = x;
- this.y = y;
- }
-
- public double euclideanDistance(Point other) {
- return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
- }
-
- @Override
- public String toString() {
- return x + " " + y;
- }
- }
-
- // ******************************************************************************************************************
- // UTIL METHODS
- // ******************************************************************************************************************
-
- private static boolean fileOutput = false;
-
- private static String verticesInputPath = null;
-
- private static String edgesInputPath = null;
-
- private static String outputPath = null;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- if (args.length == 3) {
- fileOutput = true;
- verticesInputPath = args[0];
- edgesInputPath = args[1];
- outputPath = args[2];
- } else {
- System.out.println("Executing Euclidean Graph example with default parameters and built-in default data.");
- System.out.println("Provide parameters to read input data from files.");
- System.out.println("See the documentation for the correct format of input files.");
- System.err.println("Usage: EuclideanGraphExample <input vertices path> <input edges path>" +
- " <output path>");
- return false;
- }
- }
- return true;
- }
-
- private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(verticesInputPath)
- .lineDelimiter("\n")
- .types(Long.class, Double.class, Double.class)
- .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
-
- @Override
- public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
- return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
- }
- });
- } else {
- return EuclideanGraphData.getDefaultVertexDataSet(env);
- }
- }
-
- private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(edgesInputPath)
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
-
- @Override
- public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
- return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
- }
- });
- } else {
- return EuclideanGraphData.getDefaultEdgeDataSet(env);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
new file mode 100644
index 0000000..565ef69
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+
+import java.io.Serializable;
+
+/**
+ * Given a directed, unweighted graph, with vertex values representing points in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
+ * the value being formed of two doubles separated by a comma.
+ * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
+ * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
+ * Edges themselves are separated by newlines.
+ * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ * </p>
+ *
+ * Usage <code>EuclideanGraphWeighing <vertex path> <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphWeighing implements ProgramDescription {
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+ Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // the edge value will be the Euclidean distance between its src and trg vertex
+ DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
+ .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
+
+ @Override
+ public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
+ throws Exception {
+
+ Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
+ Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
+
+ return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
+ srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+ }
+ });
+
+ Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
+ new MapFunction<Tuple2<Double, Double>, Double>() {
+
+ @Override
+ public Double map(Tuple2<Double, Double> distance) throws Exception {
+ return distance.f1;
+ }
+ });
+
+ // retrieve the edges from the final result
+ DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+ // emit result
+ if (fileOutput) {
+ result.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ result.print();
+ }
+
+ env.execute("Euclidean Graph Weighing Example");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Weighing a graph by computing the Euclidean distance " +
+ "between its vertices";
+ }
+
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ /**
+ * A simple two-dimensional point.
+ */
+ public static class Point implements Serializable {
+
+ public double x, y;
+
+ public Point() {}
+
+ public Point(double x, double y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public double euclideanDistance(Point other) {
+ return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+ }
+
+ @Override
+ public String toString() {
+ return x + " " + y;
+ }
+ }
+
+ // ******************************************************************************************************************
+ // UTIL METHODS
+ // ******************************************************************************************************************
+
+ private static boolean fileOutput = false;
+
+ private static String verticesInputPath = null;
+
+ private static String edgesInputPath = null;
+
+ private static String outputPath = null;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ if (args.length == 3) {
+ fileOutput = true;
+ verticesInputPath = args[0];
+ edgesInputPath = args[1];
+ outputPath = args[2];
+ } else {
+ System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("See the documentation for the correct format of input files.");
+ System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
+ " <output path>");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(verticesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
+
+ @Override
+ public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
+ return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
+ }
+ });
+ } else {
+ return EuclideanGraphData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+
+ @Override
+ public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+ return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
+ }
+ });
+ } else {
+ return EuclideanGraphData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
new file mode 100755
index 0000000..30855e4
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponents implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+ // Execute the GSA iteration
+ Graph<Long, Long, NullValue> result =
+ graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+ new UpdateComponentId(), maxIterations);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
+
+ // emit result
+ if (fileOutput) {
+ connectedComponents.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ connectedComponents.print();
+ }
+
+ env.execute("GSA Connected Components");
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Long> {
+
+ public Long map(Long vertexId) {
+ return vertexId;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Connected Components UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+ public void apply(Long summedValue, Long origValue) {
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static int maxIterations = 16;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+
+ if (args.length != 3) {
+ System.err.println("Usage: GSAConnectedComponents <edge path> " +
+ "<result path> <max iterations>");
+ return false;
+ }
+
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
+ } else {
+ System.out.println("Executing GSA Connected Components example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
+ }
+ return true;
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ }
+
+ // Generates 3 components of size 2
+ return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+ @Override
+ public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+ out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Connected Components";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
deleted file mode 100755
index 7c39123..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
- */
-public class GSAConnectedComponentsExample implements ProgramDescription {
-
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
- // Execute the GSA iteration
- Graph<Long, Long, NullValue> result =
- graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
- new UpdateComponentId(), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
-
- // emit result
- if (fileOutput) {
- connectedComponents.writeAsCsv(outputPath, "\n", " ");
- } else {
- connectedComponents.print();
- }
-
- env.execute("GSA Connected Components");
- }
-
- @SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Long> {
-
- public Long map(Long vertexId) {
- return vertexId;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Connected Components UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
- public Long gather(Neighbor<Long, NullValue> neighbor) {
- return neighbor.getNeighborValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
- public Long sum(Long newValue, Long currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
- public void apply(Long summedValue, Long origValue) {
- if (summedValue < origValue) {
- setResult(summedValue);
- }
- }
- };
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
-
- private static int maxIterations = 16;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if (args.length != 3) {
- System.err.println("Usage: GSAConnectedComponentsExample <edge path> " +
- "<result path> <max iterations>");
- return false;
- }
-
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- } else {
- System.out.println("Executing GSA Connected Components example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAConnectedComponentsExample <edge path> <result path> <max iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
- return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
- }
- });
- }
-
- // Generates 3 components of size 2
- return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
- @Override
- public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
- out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Connected Components";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..bbc344f
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+
+ // Execute the GSA iteration
+ Graph<Long, Double, Double> result = graph
+ .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+ new UpdateDistance(), maxIterations);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+ // emit result
+ if(fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ singleSourceShortestPaths.print();
+ }
+
+ env.execute("GSA Single Source Shortest Paths");
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Double>{
+
+ private long srcId;
+
+ public InitVertices(long srcId) {
+ this.srcId = srcId;
+ }
+
+ public Double map(Long id) {
+ if (id.equals(srcId)) {
+ return 0.0;
+ }
+ else {
+ return Double.POSITIVE_INFINITY;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+ public Double gather(Neighbor<Double, Double> neighbor) {
+ return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+ public Double sum(Double newValue, Double currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
+
+ public void apply(Double newDistance, Double oldDistance) {
+ if (newDistance < oldDistance) {
+ setResult(newDistance);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+
+ private static Long srcVertexId = 1l;
+
+ private static String edgesInputPath = null;
+
+ private static String outputPath = null;
+
+ private static int maxIterations = 5;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ return false;
+ }
+
+ fileOutput = true;
+ srcVertexId = Long.parseLong(args[0]);
+ edgesInputPath = args[1];
+ outputPath = args[2];
+ maxIterations = Integer.parseInt(args[3]);
+ } else {
+ System.out.println("Executing GSASingle Source Shortest Paths example "
+ + "with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new Tuple3ToEdgeMap<Long, Double>());
+ } else {
+ return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Single Source Shortest Paths";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
deleted file mode 100755
index 75cbd78..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
- */
-public class GSASingleSourceShortestPathsExample implements ProgramDescription {
-
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
- Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
-
- // Execute the GSA iteration
- Graph<Long, Double, Double> result = graph
- .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
- new UpdateDistance(), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
-
- // emit result
- if(fileOutput) {
- singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
- } else {
- singleSourceShortestPaths.print();
- }
-
- env.execute("GSA Single Source Shortest Paths Example");
- }
-
- @SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Double>{
-
- private long srcId;
-
- public InitVertices(long srcId) {
- this.srcId = srcId;
- }
-
- public Double map(Long id) {
- if (id.equals(srcId)) {
- return 0.0;
- }
- else {
- return Double.POSITIVE_INFINITY;
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Single Source Shortest Path UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
-
- public Double gather(Neighbor<Double, Double> neighbor) {
- return neighbor.getNeighborValue() + neighbor.getEdgeValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
-
- public Double sum(Double newValue, Double currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
-
- public void apply(Double newDistance, Double oldDistance) {
- if (newDistance < oldDistance) {
- setResult(newDistance);
- }
- }
- };
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
-
- private static Long srcVertexId = 1l;
-
- private static String edgesInputPath = null;
-
- private static String outputPath = null;
-
- private static int maxIterations = 5;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- if(args.length != 4) {
- System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>");
- return false;
- }
-
- fileOutput = true;
- srcVertexId = Long.parseLong(args[0]);
- edgesInputPath = args[1];
- outputPath = args[2];
- maxIterations = Integer.parseInt(args[3]);
- } else {
- System.out.println("Executing GSASingle Source Shortest Paths example "
- + "with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>");
- }
- return true;
- }
-
- private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(edgesInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class, Double.class)
- .map(new Tuple3ToEdgeMap<Long, Double>());
- } else {
- return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
- }
- }
-
- @Override
- public String getDescription() {
- return "GSA Single Source Shortest Paths";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
new file mode 100644
index 0000000..dddaf41
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
+ * Edges themselves are separated by newlines.
+ * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasure <edge path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasure implements ProgramDescription {
+
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+ Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
+
+ DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
+ graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+
+ Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
+
+ // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
+ DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
+ .map(new WeighEdgesMapper());
+
+ DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
+ new MapFunction<Tuple2<Double, Double>, Double>() {
+
+ @Override
+ public Double map(Tuple2<Double, Double> value) throws Exception {
+ return value.f1;
+ }
+ }).getEdges();
+
+ // emit result
+ if (fileOutput) {
+ result.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ result.print();
+ }
+
+ env.execute("Executing Jaccard Similarity Measure");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Vertex Jaccard Similarity Measure";
+ }
+
+ /**
+ * Each vertex will have a HashSet containing its neighbor ids as value.
+ */
+ private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
+ Collector<Vertex<Long, HashSet<Long>>> out) throws Exception {
+
+ HashSet<Long> neighborsHashSet = new HashSet<Long>();
+ long vertexId = -1;
+
+ for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
+ neighborsHashSet.add(getNeighborID(edge));
+ vertexId = edge.f0;
+ }
+ out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
+ }
+ }
+
+ /**
+ * The edge weight will be the Jaccard coefficient, which is computed as follows:
+ *
+ * Consider the edge x-y
+ * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
+ * sizeX+sizeY = union + intersection of neighborhoods
+ * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+ * The intersection can then be deduced.
+ *
+ * The Jaccard similarity coefficient is then, the intersection/union.
+ */
+ private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
+ Tuple3<Long, Long, Double>> {
+
+ @Override
+ public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
+ throws Exception {
+
+ Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
+ Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
+
+ long unionPlusIntersection = source.getValue().size() + target.getValue().size();
+ // within a HashSet, all elements are distinct
+ source.getValue().addAll(target.getValue());
+ // the source value contains the union
+ long union = source.getValue().size();
+ long intersection = unionPlusIntersection - union;
+
+ return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
+ }
+ }
+
+ /**
+ * Helper method that extracts the neighborId given an edge.
+ * @param edge
+ * @return
+ */
+ private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
+ if(edge.f1.getSource() == edge.f0) {
+ return edge.f1.getTarget();
+ } else {
+ return edge.f1.getSource();
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static boolean parseParameters(String [] args) {
+ if(args.length > 0) {
+ if(args.length != 2) {
+ System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
+ return false;
+ }
+
+ fileOutput = true;
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
+ }
+
+ return true;
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .ignoreComments("#")
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+ @Override
+ public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+ return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
+ }
+ });
+ } else {
+ return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
deleted file mode 100644
index 2783a29..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
- * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
- * of the union of neighbor sets - for the src and target vertices.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <br>
- * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
- * Edges themselves are separated by newlines.
- * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3.
- * </p>
- *
- * Usage <code> JaccardSimilarityMeasureExample <edge path> <result path></code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
- */
-@SuppressWarnings("serial")
-public class JaccardSimilarityMeasureExample implements ProgramDescription {
-
- public static void main(String [] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
- Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
-
- DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
- graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
-
- Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
-
- // the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
- DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
- .map(new WeighEdgesMapper());
-
- DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
- new MapFunction<Tuple2<Double, Double>, Double>() {
-
- @Override
- public Double map(Tuple2<Double, Double> value) throws Exception {
- return value.f1;
- }
- }).getEdges();
-
- // emit result
- if (fileOutput) {
- result.writeAsCsv(outputPath, "\n", ",");
- } else {
- result.print();
- }
-
- env.execute("Executing Jaccard Similarity Measure");
- }
-
- @Override
- public String getDescription() {
- return "Vertex Jaccard Similarity Measure";
- }
-
- /**
- * Each vertex will have a HashSet containing its neighbor ids as value.
- */
- private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
-
- @Override
- public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
- Collector<Vertex<Long, HashSet<Long>>> out) throws Exception {
-
- HashSet<Long> neighborsHashSet = new HashSet<Long>();
- long vertexId = -1;
-
- for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
- neighborsHashSet.add(getNeighborID(edge));
- vertexId = edge.f0;
- }
- out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
- }
- }
-
- /**
- * The edge weight will be the Jaccard coefficient, which is computed as follows:
- *
- * Consider the edge x-y
- * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
- * sizeX+sizeY = union + intersection of neighborhoods
- * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
- * The intersection can then be deduced.
- *
- * The Jaccard similarity coefficient is then, the intersection/union.
- */
- private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
- Tuple3<Long, Long, Double>> {
-
- @Override
- public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
- throws Exception {
-
- Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
- Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
-
- long unionPlusIntersection = source.getValue().size() + target.getValue().size();
- // within a HashSet, all elements are distinct
- source.getValue().addAll(target.getValue());
- // the source value contains the union
- long union = source.getValue().size();
- long intersection = unionPlusIntersection - union;
-
- return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
- }
- }
-
- /**
- * Helper method that extracts the neighborId given an edge.
- * @param edge
- * @return
- */
- private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
- if(edge.f1.getSource() == edge.f0) {
- return edge.f1.getTarget();
- } else {
- return edge.f1.getSource();
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
-
- private static boolean parseParameters(String [] args) {
- if(args.length > 0) {
- if(args.length != 2) {
- System.err.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- } else {
- System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
- System.out.println("Provide parameters to read input data from files.");
- System.out.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
- }
-
- return true;
- }
-
- private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-
- if(fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .ignoreComments("#")
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
- @Override
- public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
- return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
- }
- });
- } else {
- return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
new file mode 100644
index 0000000..4012a4e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.LabelPropagationAlgorithm;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example uses the label propagation algorithm to detect communities by
+ * propagating labels. Initially, each vertex is assigned its id as its label.
+ * The vertices iteratively propagate their labels to their neighbors and adopt
+ * the most frequent label among their neighbors. The algorithm converges when
+ * no vertex changes value or the maximum number of iterations have been
+ * reached.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs
+ * in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * The vertices input file is expected to contain one vertex per line, with long IDs
+ * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ */
+public class LabelPropagation implements ProgramDescription {
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // Set up the execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // Set up the graph
+ DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+ DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // Set up the program
+ DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
+ new LabelPropagationAlgorithm<Long>(maxIterations)).getVertices();
+
+ // Emit results
+ if(fileOutput) {
+ verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ verticesWithCommunity.print();
+ }
+
+ // Execute the program
+ env.execute("Label Propagation Example");
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String vertexInputPath = null;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+ private static long numVertices = 100;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
+ return false;
+ }
+
+ fileOutput = true;
+ vertexInputPath = args[0];
+ edgeInputPath = args[1];
+ outputPath = args[2];
+ maxIterations = Integer.parseInt(args[3]);
+ } else {
+ System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
+ }
+ return true;
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+
+ if (fileOutput) {
+ return env.readCsvFile(vertexInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new Tuple2ToVertexMap<Long, Long>());
+ }
+
+ return env.generateSequence(1, numVertices).map(
+ new MapFunction<Long, Vertex<Long, Long>>() {
+ public Vertex<Long, Long> map(Long l) throws Exception {
+ return new Vertex<Long, Long>(l, l);
+ }
+ });
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+
+ if (fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+ @Override
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ }
+
+ return env.generateSequence(1, numVertices).flatMap(
+ new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+ @Override
+ public void flatMap(Long key,
+ Collector<Edge<Long, NullValue>> out) {
+ int numOutEdges = (int) (Math.random() * (numVertices / 2));
+ for (int i = 0; i < numOutEdges; i++) {
+ long target = (long) (Math.random() * numVertices) + 1;
+ out.collect(new Edge<Long, NullValue>(key, target,
+ NullValue.getInstance()));
+ }
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "Label Propagation Example";
+ }
+}
\ No newline at end of file