You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/08/22 21:39:32 UTC
[1/4] flink git commit: [FLINK-2451] [gelly] library methods cleanup
Repository: flink
Updated Branches:
refs/heads/master 3a8302998 -> 5ae84273c
[FLINK-2451] [gelly] library methods cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2d061c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2d061c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2d061c3
Branch: refs/heads/master
Commit: d2d061c3004dcdc31a1df6b422a6aba6d9c294f8
Parents: 970ab35
Author: vasia <va...@apache.org>
Authored: Mon Aug 3 15:20:44 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Aug 22 20:46:20 2015 +0200
----------------------------------------------------------------------
.../graph/example/ConnectedComponents.java | 4 +-
.../flink/graph/example/MusicProfiles.java | 4 +-
.../flink/graph/library/CommunityDetection.java | 173 +++++++++++++++++++
.../library/CommunityDetectionAlgorithm.java | 173 -------------------
.../graph/library/ConnectedComponents.java | 88 ++++++++++
.../library/ConnectedComponentsAlgorithm.java | 89 ----------
.../graph/library/GSAConnectedComponents.java | 127 ++------------
.../apache/flink/graph/library/GSAPageRank.java | 161 +++--------------
.../library/GSASingleSourceShortestPaths.java | 126 +++-----------
.../flink/graph/library/LabelPropagation.java | 113 ++++++++++++
.../library/LabelPropagationAlgorithm.java | 115 ------------
.../apache/flink/graph/library/PageRank.java | 105 +++++++++++
.../flink/graph/library/PageRankAlgorithm.java | 105 -----------
.../library/SingleSourceShortestPaths.java | 111 ++++++++++++
.../SingleSourceShortestPathsAlgorithm.java | 111 ------------
15 files changed, 653 insertions(+), 952 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index b841ced..a4a6708 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -27,7 +27,7 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
+import org.apache.flink.graph.library.GSAConnectedComponents;
import org.apache.flink.types.NullValue;
/**
@@ -69,7 +69,7 @@ public class ConnectedComponents implements ProgramDescription {
}, env);
DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
- .run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices();
+ .run(new GSAConnectedComponents(maxIterations)).getVertices();
// emit result
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 7643976..a56224d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -38,7 +38,7 @@ import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.MusicProfilesData;
-import org.apache.flink.graph.library.LabelPropagationAlgorithm;
+import org.apache.flink.graph.library.LabelPropagation;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
@@ -153,7 +153,7 @@ public class MusicProfiles implements ProgramDescription {
public Long map(Tuple2<Long, Long> value) {
return value.f1;
}
- }).run(new LabelPropagationAlgorithm<String>(maxIterations))
+ }).run(new LabelPropagation<String>(maxIterations))
.getVertices();
if (fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
new file mode 100644
index 0000000..21bef53
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Community Detection Algorithm.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
+ * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
+ * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
+ */
+public class CommunityDetection implements GraphAlgorithm<Long, Long, Double> {
+
+ private Integer maxIterations;
+
+ private Double delta;
+
+ public CommunityDetection(Integer maxIterations, Double delta) {
+
+ this.maxIterations = maxIterations;
+ this.delta = delta;
+ }
+
+ @Override
+ public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+
+ Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
+
+ Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
+ .mapVertices(new AddScoreToVertexValuesMapper());
+
+ return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
+ new LabelMessenger(), maxIterations)
+ .mapVertices(new RemoveScoreFromVertexValuesMapper());
+ }
+
+ @SuppressWarnings("serial")
+ public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ private Double delta;
+
+ public VertexLabelUpdater(Double delta) {
+ this.delta = delta;
+ }
+
+ @Override
+ public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex,
+ MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+ // we would like these two maps to be ordered
+ Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
+ Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
+
+ for (Tuple2<Long, Double> message : inMessages) {
+ // split the message into received label and score
+ Long receivedLabel = message.f0;
+ Double receivedScore = message.f1;
+
+ // if the label was received before
+ if (receivedLabelsWithScores.containsKey(receivedLabel)) {
+ Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
+ receivedLabelsWithScores.put(receivedLabel, newScore);
+ } else {
+ // first time we see the label
+ receivedLabelsWithScores.put(receivedLabel, receivedScore);
+ }
+
+ // store the labels with the highest scores
+ if (labelsWithHighestScore.containsKey(receivedLabel)) {
+ Double currentScore = labelsWithHighestScore.get(receivedLabel);
+ if (currentScore < receivedScore) {
+ // record the highest score
+ labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ } else {
+ // first time we see this label
+ labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ }
+
+ if(receivedLabelsWithScores.size() > 0) {
+ // find the label with the highest score from the ones received
+ Double maxScore = -Double.MAX_VALUE;
+ Long maxScoreLabel = vertex.getValue().f0;
+ for (Long curLabel : receivedLabelsWithScores.keySet()) {
+
+ if (receivedLabelsWithScores.get(curLabel) > maxScore) {
+ maxScore = receivedLabelsWithScores.get(curLabel);
+ maxScoreLabel = curLabel;
+ }
+ }
+
+ // find the highest score of maxScoreLabel
+ Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
+ // re-score the new label
+ if (maxScoreLabel != vertex.getValue().f0) {
+ highestScore -= delta / getSuperstepNumber();
+ }
+ // else delta = 0
+ // update own label
+ setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
+ Tuple2<Long, Double>, Double> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+
+ for(Edge<Long, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
+ vertex.getValue().f1 * edge.getValue()));
+ }
+
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+
+ @Override
+ public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
+ return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+
+ @Override
+ public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+ return vertex.getValue().f0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
deleted file mode 100644
index 6f72deb..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Community Detection Algorithm.
- *
- * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
- * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
- * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
- * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- *
- * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
- */
-public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, Double> {
-
- private Integer maxIterations;
-
- private Double delta;
-
- public CommunityDetectionAlgorithm(Integer maxIterations, Double delta) {
-
- this.maxIterations = maxIterations;
- this.delta = delta;
- }
-
- @Override
- public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
-
- Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
-
- Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
- .mapVertices(new AddScoreToVertexValuesMapper());
-
- return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
- new LabelMessenger(), maxIterations)
- .mapVertices(new RemoveScoreFromVertexValuesMapper());
- }
-
- @SuppressWarnings("serial")
- public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
- private Double delta;
-
- public VertexLabelUpdater(Double delta) {
- this.delta = delta;
- }
-
- @Override
- public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex,
- MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
-
- // we would like these two maps to be ordered
- Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
- Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
-
- for (Tuple2<Long, Double> message : inMessages) {
- // split the message into received label and score
- Long receivedLabel = message.f0;
- Double receivedScore = message.f1;
-
- // if the label was received before
- if (receivedLabelsWithScores.containsKey(receivedLabel)) {
- Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
- receivedLabelsWithScores.put(receivedLabel, newScore);
- } else {
- // first time we see the label
- receivedLabelsWithScores.put(receivedLabel, receivedScore);
- }
-
- // store the labels with the highest scores
- if (labelsWithHighestScore.containsKey(receivedLabel)) {
- Double currentScore = labelsWithHighestScore.get(receivedLabel);
- if (currentScore < receivedScore) {
- // record the highest score
- labelsWithHighestScore.put(receivedLabel, receivedScore);
- }
- } else {
- // first time we see this label
- labelsWithHighestScore.put(receivedLabel, receivedScore);
- }
- }
-
- if(receivedLabelsWithScores.size() > 0) {
- // find the label with the highest score from the ones received
- Double maxScore = -Double.MAX_VALUE;
- Long maxScoreLabel = vertex.getValue().f0;
- for (Long curLabel : receivedLabelsWithScores.keySet()) {
-
- if (receivedLabelsWithScores.get(curLabel) > maxScore) {
- maxScore = receivedLabelsWithScores.get(curLabel);
- maxScoreLabel = curLabel;
- }
- }
-
- // find the highest score of maxScoreLabel
- Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
- // re-score the new label
- if (maxScoreLabel != vertex.getValue().f0) {
- highestScore -= delta / getSuperstepNumber();
- }
- // else delta = 0
- // update own label
- setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
- }
- }
- }
-
- @SuppressWarnings("serial")
- public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
- Tuple2<Long, Double>, Double> {
-
- @Override
- public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
-
- for(Edge<Long, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
- vertex.getValue().f1 * edge.getValue()));
- }
-
- }
- }
-
- @SuppressWarnings("serial")
- public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
-
- @Override
- public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
- return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
- }
- }
-
- @SuppressWarnings("serial")
- public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
-
- @Override
- public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
- return vertex.getValue().f0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
new file mode 100644
index 0000000..c2cec18
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A vertex-centric implementation of the Connected components algorithm.
+ *
+ * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
+ * current component ID in iterations, each time adopting a new value from the received neighbor IDs,
+ * provided that the value is less than the current minimum.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ */
+@SuppressWarnings("serial")
+public class ConnectedComponents implements GraphAlgorithm<Long, Long, NullValue>{
+
+ private Integer maxIterations;
+
+ public ConnectedComponents(Integer maxIterations) {
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
+
+ Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+
+ // initialize vertex values and run the Vertex Centric Iteration
+ return undirectedGraph.runVertexCentricIteration(new CCUpdater(), new CCMessenger(), maxIterations);
+ }
+
+ /**
+ * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
+ */
+ public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
+ long min = Long.MAX_VALUE;
+
+ for (long msg : messages) {
+ min = Math.min(min, msg);
+ }
+
+ // update vertex value, if new minimum
+ if (min < vertex.getValue()) {
+ setNewVertexValue(min);
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum ID associated with a given vertex among all the target vertices.
+ */
+ public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
+ // send current minimum to neighbors
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
deleted file mode 100644
index 7b536e5..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-
-/**
- * Connected components algorithm.
- *
- * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
- * current component ID in iterations, each time adopting a new value from the received neighbor IDs,
- * provided that the value is less than the current minimum.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- */
-@SuppressWarnings("serial")
-public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long, NullValue>{
-
- private Integer maxIterations;
-
- public ConnectedComponentsAlgorithm(Integer maxIterations) {
- this.maxIterations = maxIterations;
- }
-
- @Override
- public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
-
- Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
-
- // initialize vertex values and run the Vertex Centric Iteration
- return undirectedGraph.runVertexCentricIteration(new CCUpdater(),
- new CCMessenger(), maxIterations);
- }
-
- /**
- * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
- */
- public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
-
- @Override
- public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
- long min = Long.MAX_VALUE;
-
- for (long msg : messages) {
- min = Math.min(min, msg);
- }
-
- // update vertex value, if new minimum
- if (min < vertex.getValue()) {
- setNewVertexValue(min);
- }
- }
- }
-
- /**
- * Distributes the minimum ID associated with a given vertex among all the target vertices.
- */
- public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
-
- @Override
- public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
- // send current minimum to neighbors
- sendMessageToAllNeighbors(vertex.getValue());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index 9b75c92..b77ca07 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -16,71 +16,35 @@
* limitations under the License.
*/
-package org.apache.flink.graph.example;
+package org.apache.flink.graph.library;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
*/
-public class GSAConnectedComponents implements ProgramDescription {
+public class GSAConnectedComponents implements GraphAlgorithm<Long, Long, NullValue> {
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
- // Execute the GSA iteration
- Graph<Long, Long, NullValue> result =
- graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
- new UpdateComponentId(), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
-
- // emit result
- if (fileOutput) {
- connectedComponents.writeAsCsv(outputPath, "\n", " ");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("GSA Connected Components");
- } else {
- connectedComponents.print();
- }
+ private Integer maxIterations;
+ public GSAConnectedComponents(Integer maxIterations) {
+ this.maxIterations = maxIterations;
}
- @SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Long> {
+ @Override
+ public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
- public Long map(Long vertexId) {
- return vertexId;
- }
+ Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+
+ // initialize vertex values and run the Vertex Centric Iteration
+ return undirectedGraph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(),
+ maxIterations);
}
// --------------------------------------------------------------------------------------------
@@ -112,67 +76,4 @@ public class GSAConnectedComponents implements ProgramDescription {
}
}
}
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
-
- private static int maxIterations = 16;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if (args.length != 3) {
- System.err.println("Usage: GSAConnectedComponents <edge path> " +
- "<result path> <max iterations>");
- return false;
- }
-
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- } else {
- System.out.println("Executing GSA Connected Components example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
- return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
- }
- });
- }
-
- // Generates 3 components of size 2
- return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
- @Override
- public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
- out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Connected Components";
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 45d4555..4299381 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -16,86 +16,35 @@
* limitations under the License.
*/
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
+package org.apache.flink.graph.library;
+
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.util.Collector;
/**
- * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration.
- *
- * The edges input file is expected to contain one edge per line, with long IDs and no
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * If no arguments are provided, the example runs with a random graph of 10 vertices
- * and random edge weights.
+ * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
*/
-public class GSAPageRank implements ProgramDescription {
-
- @SuppressWarnings("serial")
- public static void main(String[] args) throws Exception {
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
+ private double beta;
+ private int maxIterations;
- Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
-
- @Override
- public Double map(Long value) throws Exception {
- return 1.0;
- }
- }, env);
-
- DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-
- // Assign the transition probabilities as the edge weights
- Graph<Long, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees,
- new MapFunction<Tuple2<Double, Long>, Double>() {
-
- @Override
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
- }
- });
-
- long numberOfVertices = networkWithWeights.numberOfVertices();
-
- // Execute the GSA iteration
- Graph<Long, Double, Double> result = networkWithWeights
- .runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
- new UpdateRanks(numberOfVertices), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Double>> pageRanks = result.getVertices();
+ public GSAPageRank(double beta, int maxIterations) {
+ this.beta = beta;
+ this.maxIterations = maxIterations;
+ }
- // emit result
- if (fileOutput) {
- pageRanks.writeAsCsv(outputPath, "\n", "\t");
+ @Override
+ public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("GSA Page Ranks");
- } else {
- pageRanks.print();
- }
+ final long numberOfVertices = network.numberOfVertices();
+ return network.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+ new UpdateRanks<K>(beta, numberOfVertices), maxIterations);
}
// --------------------------------------------------------------------------------------------
@@ -133,83 +82,19 @@ public class GSAPageRank implements ProgramDescription {
}
@SuppressWarnings("serial")
- private static final class UpdateRanks extends ApplyFunction<Long, Double, Double> {
+ private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
- long numberOfVertices;
+ private final double beta;
+ private final long numVertices;
- public UpdateRanks(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
+ public UpdateRanks(double beta, long numberOfVertices) {
+ this.beta = beta;
+ this.numVertices = numberOfVertices;
}
@Override
public void apply(Double rankSum, Double currentValue) {
- setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum);
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static final double DAMPENING_FACTOR = 0.85;
- private static long numPages = 10;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static int maxIterations = 10;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- if(args.length != 3) {
- System.err.println("Usage: GSAPageRank <input edges path> <output path> <num iterations>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- } else {
- System.out.println("Executing GSAPageRank example with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAPageRank <input edges path> <output path> <num iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
- public Edge<Long, Double> map(Tuple2<Long, Long> input) {
- return new Edge<Long, Double>(input.f0, input.f1, 1.0);
- }
- }).withForwardedFields("f0; f1");
+ setResult((1-beta)/numVertices + beta * rankSum);
}
-
- return env.generateSequence(1, numPages).flatMap(
- new FlatMapFunction<Long, Edge<Long, Double>>() {
- @Override
- public void flatMap(Long key,
- Collector<Edge<Long, Double>> out) throws Exception {
- int numOutEdges = (int) (Math.random() * (numPages / 2));
- for (int i = 0; i < numOutEdges; i++) {
- long target = (long) (Math.random() * numPages) + 1;
- out.collect(new Edge<Long, Double>(key, target, 1.0));
- }
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Page Rank";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index b01aa23..78c535b 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -16,78 +16,52 @@
* limitations under the License.
*/
-package org.apache.flink.graph.example;
+package org.apache.flink.graph.library;
-import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
*/
-public class GSASingleSourceShortestPaths implements ProgramDescription {
+public class GSASingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
+ private final K srcVertexId;
+ private final Integer maxIterations;
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+ public GSASingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+ this.srcVertexId = srcVertexId;
+ this.maxIterations = maxIterations;
+ }
- Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+ @Override
+ public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
- // Execute the GSA iteration
- Graph<Long, Double, Double> result = graph
+ return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
- new UpdateDistance(), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
-
- // emit result
- if(fileOutput) {
- singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("GSA Single Source Shortest Paths");
- } else {
- singleSourceShortestPaths.print();
- }
-
+ new UpdateDistance<K>(), maxIterations);
}
@SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Double>{
+ public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
- private long srcId;
+ private K srcVertexId;
- public InitVertices(long srcId) {
- this.srcId = srcId;
+ public InitVerticesMapper(K srcId) {
+ this.srcVertexId = srcId;
}
- public Double map(Long id) {
- if (id.equals(srcId)) {
+ public Double map(Vertex<K, Double> value) {
+ if (value.f0.equals(srcVertexId)) {
return 0.0;
- }
- else {
- return Double.POSITIVE_INFINITY;
+ } else {
+ return Double.MAX_VALUE;
}
}
}
@@ -113,7 +87,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
};
@SuppressWarnings("serial")
- private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
+ private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {
public void apply(Double newDistance, Double oldDistance) {
if (newDistance < oldDistance) {
@@ -121,60 +95,4 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
}
}
}
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
-
- private static Long srcVertexId = 1l;
-
- private static String edgesInputPath = null;
-
- private static String outputPath = null;
-
- private static int maxIterations = 5;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- if(args.length != 4) {
- System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>");
- return false;
- }
-
- fileOutput = true;
- srcVertexId = Long.parseLong(args[0]);
- edgesInputPath = args[1];
- outputPath = args[2];
- maxIterations = Integer.parseInt(args[3]);
- } else {
- System.out.println("Executing GSASingle Source Shortest Paths example "
- + "with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
- " <input edges path> <output path> <num iterations>");
- }
- return true;
- }
-
- private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(edgesInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class, Double.class)
- .map(new Tuple3ToEdgeMap<Long, Double>());
- } else {
- return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
- }
- }
-
- @Override
- public String getDescription() {
- return "GSA Single Source Shortest Paths";
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
new file mode 100644
index 0000000..1648922
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An implementation of the label propagation algorithm. The iterative algorithm
+ * detects communities by propagating labels. In each iteration, a vertex adopts
+ * the label that is most frequent among its neighbors' labels. Labels are
+ * represented by Longs and we assume a total ordering among them, in order to
+ * break ties. The algorithm converges when no vertex changes its value or the
+ * maximum number of iterations have been reached. Note that different
+ * initializations might lead to different results.
+ *
+ */
+@SuppressWarnings("serial")
+
+public class LabelPropagation<K extends Comparable<K>> implements GraphAlgorithm<K, Long, NullValue> {
+
+ private final int maxIterations;
+
+ public LabelPropagation(int maxIterations) {
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+
+ // iteratively adopt the most frequent label among the neighbors
+ // of each vertex
+ return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+ maxIterations);
+ }
+
+ /**
+ * Function that updates the value of a vertex by adopting the most frequent
+ * label among its in-neighbors
+ */
+ public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
+
+ public void updateVertex(Vertex<K, Long> vertex,
+ MessageIterator<Long> inMessages) {
+ Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
+
+ long maxFrequency = 1;
+ long mostFrequentLabel = vertex.getValue();
+
+ // store the labels with their frequencies
+ for (Long msg : inMessages) {
+ if (labelsWithFrequencies.containsKey(msg)) {
+ long currentFreq = labelsWithFrequencies.get(msg);
+ labelsWithFrequencies.put(msg, currentFreq + 1);
+ } else {
+ labelsWithFrequencies.put(msg, 1L);
+ }
+ }
+ // select the most frequent label: if two or more labels have the
+ // same frequency,
+ // the node adopts the label with the highest value
+ for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
+ if (entry.getValue() == maxFrequency) {
+ // check the label value to break ties
+ if (entry.getKey() > mostFrequentLabel) {
+ mostFrequentLabel = entry.getKey();
+ }
+ } else if (entry.getValue() > maxFrequency) {
+ maxFrequency = entry.getValue();
+ mostFrequentLabel = entry.getKey();
+ }
+ }
+
+ // set the new vertex value
+ setNewVertexValue(mostFrequentLabel);
+ }
+ }
+
+ /**
+ * Sends the vertex label to all out-neighbors
+ */
+ public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
+
+ public void sendMessages(Vertex<K, Long> vertex) {
+ sendMessageToAllNeighbors(vertex.getValue());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
deleted file mode 100644
index 0b0f4fc..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.types.NullValue;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * An implementation of the label propagation algorithm. The iterative algorithm
- * detects communities by propagating labels. In each iteration, a vertex adopts
- * the label that is most frequent among its neighbors' labels. Labels are
- * represented by Longs and we assume a total ordering among them, in order to
- * break ties. The algorithm converges when no vertex changes its value or the
- * maximum number of iterations have been reached. Note that different
- * initializations might lead to different results.
- *
- */
-@SuppressWarnings("serial")
-
-public class LabelPropagationAlgorithm<K extends Comparable<K> & Serializable>
- implements GraphAlgorithm<K, Long, NullValue> {
-
- private final int maxIterations;
-
- public LabelPropagationAlgorithm(int maxIterations) {
- this.maxIterations = maxIterations;
- }
-
- @Override
- public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
-
- // iteratively adopt the most frequent label among the neighbors
- // of each vertex
- return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
- maxIterations);
- }
-
- /**
- * Function that updates the value of a vertex by adopting the most frequent
- * label among its in-neighbors
- */
- public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
-
- public void updateVertex(Vertex<K, Long> vertex,
- MessageIterator<Long> inMessages) {
- Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
-
- long maxFrequency = 1;
- long mostFrequentLabel = vertex.getValue();
-
- // store the labels with their frequencies
- for (Long msg : inMessages) {
- if (labelsWithFrequencies.containsKey(msg)) {
- long currentFreq = labelsWithFrequencies.get(msg);
- labelsWithFrequencies.put(msg, currentFreq + 1);
- } else {
- labelsWithFrequencies.put(msg, 1L);
- }
- }
- // select the most frequent label: if two or more labels have the
- // same frequency,
- // the node adopts the label with the highest value
- for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
- if (entry.getValue() == maxFrequency) {
- // check the label value to break ties
- if (entry.getKey() > mostFrequentLabel) {
- mostFrequentLabel = entry.getKey();
- }
- } else if (entry.getValue() > maxFrequency) {
- maxFrequency = entry.getValue();
- mostFrequentLabel = entry.getKey();
- }
- }
-
- // set the new vertex value
- setNewVertexValue(mostFrequentLabel);
- }
- }
-
- /**
- * Sends the vertex label to all out-neighbors
- */
- public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
-
- public void sendMessages(Vertex<K, Long> vertex) {
- sendMessageToAllNeighbors(vertex.getValue());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
new file mode 100644
index 0000000..03cb740
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
+ */
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
+
+ private double beta;
+ private int maxIterations;
+
+ public PageRank(double beta, int maxIterations) {
+ this.beta = beta;
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
+
+ final long numberOfVertices = network.numberOfVertices();
+
+ return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
+ new RankMessenger<K>(numberOfVertices), maxIterations);
+ }
+
+ /**
+ * Function that updates the rank of a vertex by summing up the partial
+ * ranks from all incoming messages and then applying the dampening formula.
+ */
+ @SuppressWarnings("serial")
+ public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+ private final double beta;
+ private final long numVertices;
+
+ public VertexRankUpdater(double beta, long numberOfVertices) {
+ this.beta = beta;
+ this.numVertices = numberOfVertices;
+ }
+
+ @Override
+ public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
+ double rankSum = 0.0;
+ for (double msg : inMessages) {
+ rankSum += msg;
+ }
+
+ // apply the dampening factor / random jump
+ double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+ setNewVertexValue(newRank);
+ }
+ }
+
+ /**
+ * Distributes the rank of a vertex among all target vertices according to
+ * the transition probability, which is associated with an edge as the edge
+ * value.
+ */
+ @SuppressWarnings("serial")
+ public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+ private final long numVertices;
+
+ public RankMessenger(long numberOfVertices) {
+ this.numVertices = numberOfVertices;
+ }
+
+ @Override
+ public void sendMessages(Vertex<K, Double> vertex) {
+ if (getSuperstepNumber() == 1) {
+ // initialize vertex ranks
+ vertex.setValue(new Double(1.0 / numVertices));
+ }
+
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
deleted file mode 100644
index f63fb0d..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-import java.io.Serializable;
-
-
-public class PageRankAlgorithm<K extends Comparable<K> & Serializable> implements
- GraphAlgorithm<K, Double, Double> {
-
- private double beta;
- private int maxIterations;
-
- public PageRankAlgorithm(double beta, int maxIterations) {
- this.beta = beta;
- this.maxIterations = maxIterations;
- }
-
- @Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
-
- final long numberOfVertices = network.numberOfVertices();
- return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
- maxIterations);
- }
-
- /**
- * Function that updates the rank of a vertex by summing up the partial
- * ranks from all incoming messages and then applying the dampening formula.
- */
- @SuppressWarnings("serial")
- public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
- private final double beta;
- private final long numVertices;
-
- public VertexRankUpdater(double beta, long numberOfVertices) {
- this.beta = beta;
- this.numVertices = numberOfVertices;
- }
-
- @Override
- public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
- double rankSum = 0.0;
- for (double msg : inMessages) {
- rankSum += msg;
- }
-
- // apply the dampening factor / random jump
- double newRank = (beta * rankSum) + (1 - beta) / numVertices;
- setNewVertexValue(newRank);
- }
- }
-
- /**
- * Distributes the rank of a vertex among all target vertices according to
- * the transition probability, which is associated with an edge as the edge
- * value.
- */
- @SuppressWarnings("serial")
- public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
- private final long numVertices;
-
- public RankMessenger(long numberOfVertices) {
- this.numVertices = numberOfVertices;
- }
-
- @Override
- public void sendMessages(Vertex<K, Double> vertex) {
- if (getSuperstepNumber() == 1) {
- // initialize vertex ranks
- vertex.setValue(new Double(1.0 / numVertices));
- }
-
- for (Edge<K, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
new file mode 100644
index 0000000..97ea000
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
+ */
+@SuppressWarnings("serial")
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
+
+ private final K srcVertexId;
+ private final Integer maxIterations;
+
+ public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+ this.srcVertexId = srcVertexId;
+ this.maxIterations = maxIterations;
+ }
+
+ @Override
+ public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+
+ return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+ .runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+ maxIterations);
+ }
+
+ public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
+
+ private K srcVertexId;
+
+ public InitVerticesMapper(K srcId) {
+ this.srcVertexId = srcId;
+ }
+
+ public Double map(Vertex<K, Double> value) {
+ if (value.f0.equals(srcVertexId)) {
+ return 0.0;
+ } else {
+ return Double.MAX_VALUE;
+ }
+ }
+ }
+
+ /**
+ * Function that updates the value of a vertex by picking the minimum
+ * distance from all incoming messages.
+ *
+ * @param <K>
+ */
+ public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+ @Override
+ public void updateVertex(Vertex<K, Double> vertex,
+ MessageIterator<Double> inMessages) {
+
+ Double minDistance = Double.MAX_VALUE;
+
+ for (double msg : inMessages) {
+ if (msg < minDistance) {
+ minDistance = msg;
+ }
+ }
+
+ if (vertex.getValue() > minDistance) {
+ setNewVertexValue(minDistance);
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum distance associated with a given vertex among all
+ * the target vertices summed up with the edge's value.
+ *
+ * @param <K>
+ */
+ public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+ @Override
+ public void sendMessages(Vertex<K, Double> vertex)
+ throws Exception {
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d2d061c3/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
deleted file mode 100644
index e78ae3e..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serializable>
- implements GraphAlgorithm<K, Double, Double> {
-
- private final K srcVertexId;
- private final Integer maxIterations;
-
- public SingleSourceShortestPathsAlgorithm(K srcVertexId, Integer maxIterations) {
- this.srcVertexId = srcVertexId;
- this.maxIterations = maxIterations;
- }
-
- @Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
-
- return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
- .runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
- maxIterations);
- }
-
- public static final class InitVerticesMapper<K> implements MapFunction<Vertex<K, Double>, Double> {
-
- private K srcVertexId;
-
- public InitVerticesMapper(K srcId) {
- this.srcVertexId = srcId;
- }
-
- public Double map(Vertex<K, Double> value) {
- if (value.f0.equals(srcVertexId)) {
- return 0.0;
- } else {
- return Double.MAX_VALUE;
- }
- }
- }
-
- /**
- * Function that updates the value of a vertex by picking the minimum
- * distance from all incoming messages.
- *
- * @param <K>
- */
- public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
- @Override
- public void updateVertex(Vertex<K, Double> vertex,
- MessageIterator<Double> inMessages) {
-
- Double minDistance = Double.MAX_VALUE;
-
- for (double msg : inMessages) {
- if (msg < minDistance) {
- minDistance = msg;
- }
- }
-
- if (vertex.getValue() > minDistance) {
- setNewVertexValue(minDistance);
- }
- }
- }
-
- /**
- * Distributes the minimum distance associated with a given vertex among all
- * the target vertices summed up with the edge's value.
- *
- * @param <K>
- */
- public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
- @Override
- public void sendMessages(Vertex<K, Double> vertex)
- throws Exception {
- for (Edge<K, Double> edge : getEdges()) {
- sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
- }
- }
- }
-}
\ No newline at end of file
[2/4] flink git commit: [FLINK-2451] [gelly] removed redundant
examples;
added comments describing which gelly method each example illustrates.
Posted by va...@apache.org.
[FLINK-2451] [gelly] removed redundant examples; added comments describing which gelly method each example illustrates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/970ab35e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/970ab35e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/970ab35e
Branch: refs/heads/master
Commit: 970ab35ec8f0268ed38234f35fa50706c5ff3071
Parents: 3a83029
Author: vasia <va...@apache.org>
Authored: Mon Aug 3 14:37:19 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Aug 22 20:46:20 2015 +0200
----------------------------------------------------------------------
.../flink/graph/example/CommunityDetection.java | 142 ------------
.../graph/example/ConnectedComponents.java | 11 +-
.../graph/example/EuclideanGraphWeighing.java | 3 +
.../graph/example/GSAConnectedComponents.java | 178 ---------------
.../apache/flink/graph/example/GSAPageRank.java | 215 -------------------
.../example/GSASingleSourceShortestPaths.java | 27 ++-
.../flink/graph/example/GraphMetrics.java | 3 +-
.../flink/graph/example/IncrementalSSSP.java | 6 +-
.../graph/example/JaccardSimilarityMeasure.java | 10 +-
.../flink/graph/example/LabelPropagation.java | 171 ---------------
.../flink/graph/example/MusicProfiles.java | 46 ++--
.../apache/flink/graph/example/PageRank.java | 153 -------------
.../example/SingleSourceShortestPaths.java | 106 +++++++--
.../graph/library/GSAConnectedComponents.java | 178 +++++++++++++++
.../apache/flink/graph/library/GSAPageRank.java | 215 +++++++++++++++++++
.../library/GSASingleSourceShortestPaths.java | 180 ++++++++++++++++
16 files changed, 721 insertions(+), 923 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
deleted file mode 100644
index e44e5bd..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.graph.library.CommunityDetectionAlgorithm;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use the {@link org.apache.flink.graph.library.CommunityDetectionAlgorithm}
- * library method:
- * <ul>
- * <li> with the edge data set given as a parameter
- * <li> with default data
- * </ul>
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
- * 1-2 with weight 1.0 and 1-3 with weight 2.0.
- *
- * Usage <code>CommunityDetection <edge path> <result path>
- * <number of iterations> <delta></code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.CommunityDetectionData}
- */
-public class CommunityDetection implements ProgramDescription {
-
- @SuppressWarnings("serial")
- public static void main(String [] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- // set up the execution environment
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // set up the graph
- DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
- Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
- new MapFunction<Long, Long>() {
-
- public Long map(Long label) {
- return label;
- }
- }, env);
-
- // the result is in the form of <vertexId, communityId>, where the communityId is the label
- // which the vertex converged to
- DataSet<Vertex<Long, Long>> communityVertices =
- graph.run(new CommunityDetectionAlgorithm(maxIterations, delta)).getVertices();
-
- // emit result
- if (fileOutput) {
- communityVertices.writeAsCsv(outputPath, "\n", ",");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("Executing Community Detection Example");
- } else {
- communityVertices.print();
- }
-
- }
-
- @Override
- public String getDescription() {
- return "Community Detection";
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static Integer maxIterations = CommunityDetectionData.MAX_ITERATIONS;
- private static Double delta = CommunityDetectionData.DELTA;
-
- private static boolean parseParameters(String [] args) {
- if(args.length > 0) {
- if(args.length != 4) {
- System.err.println("Usage CommunityDetection <edge path> <output path> " +
- "<num iterations> <delta>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- delta = Double.parseDouble(args[3]);
-
- } else {
- System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
- System.out.println("Provide parameters to read input data from files.");
- System.out.println("Usage CommunityDetection <edge path> <output path> " +
- "<num iterations> <delta>");
- }
-
- return true;
- }
-
- private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-
- if(fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .ignoreComments("#")
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class, Double.class)
- .map(new Tuple3ToEdgeMap<Long, Double>());
- } else {
- return CommunityDetectionData.getDefaultEdgeDataSet(env);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 3443a55..b841ced 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -31,12 +31,11 @@ import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
import org.apache.flink.types.NullValue;
/**
- * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
- * library method:
- * <ul>
- * <li> with the edge data set given as a parameter
- * <li> with default data
- * </ul>
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in {@link org.apache.flink.graph.library}.
+ *
+ * In particular, this example uses the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * library method to compute the connected components of the input graph.
*
* The input file is a plain text file and must be formatted as follows:
* Edges are represented by tuples of srcVertexId, trgVertexId which are
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
index 7e2c057..b7e3385 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -33,6 +33,9 @@ import org.apache.flink.graph.example.utils.EuclideanGraphData;
import java.io.Serializable;
/**
+ * This example shows how to use Gelly's {@link Graph#getTriplets()} and
+ * {@link Graph#joinWithEdges(DataSet, MapFunction)} methods.
+ *
* Given a directed, unweighted graph, with vertex values representing points in a plan,
* return a weighted graph where the edge weights are equal to the Euclidean distance between the
* src and the trg vertex values.
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
deleted file mode 100755
index 9b75c92..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
- */
-public class GSAConnectedComponents implements ProgramDescription {
-
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
- // Execute the GSA iteration
- Graph<Long, Long, NullValue> result =
- graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
- new UpdateComponentId(), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
-
- // emit result
- if (fileOutput) {
- connectedComponents.writeAsCsv(outputPath, "\n", " ");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("GSA Connected Components");
- } else {
- connectedComponents.print();
- }
-
- }
-
- @SuppressWarnings("serial")
- private static final class InitVertices implements MapFunction<Long, Long> {
-
- public Long map(Long vertexId) {
- return vertexId;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Connected Components UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
- public Long gather(Neighbor<Long, NullValue> neighbor) {
- return neighbor.getNeighborValue();
- }
- };
-
- @SuppressWarnings("serial")
- private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
- public Long sum(Long newValue, Long currentValue) {
- return Math.min(newValue, currentValue);
- }
- };
-
- @SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
- public void apply(Long summedValue, Long origValue) {
- if (summedValue < origValue) {
- setResult(summedValue);
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
- private static String edgeInputPath = null;
- private static String outputPath = null;
-
- private static int maxIterations = 16;
-
- private static boolean parseParameters(String[] args) {
-
- if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if (args.length != 3) {
- System.err.println("Usage: GSAConnectedComponents <edge path> " +
- "<result path> <max iterations>");
- return false;
- }
-
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- } else {
- System.out.println("Executing GSA Connected Components example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
- return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
- }
- });
- }
-
- // Generates 3 components of size 2
- return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
- @Override
- public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
- out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Connected Components";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
deleted file mode 100644
index 45d4555..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration.
- *
- * The edges input file is expected to contain one edge per line, with long IDs and no
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * If no arguments are provided, the example runs with a random graph of 10 vertices
- * and random edge weights.
- */
-public class GSAPageRank implements ProgramDescription {
-
- @SuppressWarnings("serial")
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
-
- Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
-
- @Override
- public Double map(Long value) throws Exception {
- return 1.0;
- }
- }, env);
-
- DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-
- // Assign the transition probabilities as the edge weights
- Graph<Long, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees,
- new MapFunction<Tuple2<Double, Long>, Double>() {
-
- @Override
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
- }
- });
-
- long numberOfVertices = networkWithWeights.numberOfVertices();
-
- // Execute the GSA iteration
- Graph<Long, Double, Double> result = networkWithWeights
- .runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
- new UpdateRanks(numberOfVertices), maxIterations);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Double>> pageRanks = result.getVertices();
-
- // emit result
- if (fileOutput) {
- pageRanks.writeAsCsv(outputPath, "\n", "\t");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute("GSA Page Ranks");
- } else {
- pageRanks.print();
- }
-
- }
-
- // --------------------------------------------------------------------------------------------
- // Page Rank UDFs
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("serial")
- private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
-
- long numberOfVertices;
-
- public GatherRanks(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- @Override
- public Double gather(Neighbor<Double, Double> neighbor) {
- double neighborRank = neighbor.getNeighborValue();
-
- if(getSuperstepNumber() == 1) {
- neighborRank = 1.0 / numberOfVertices;
- }
-
- return neighborRank * neighbor.getEdgeValue();
- }
- }
-
- @SuppressWarnings("serial")
- private static final class SumRanks extends SumFunction<Double, Double, Double> {
-
- @Override
- public Double sum(Double newValue, Double currentValue) {
- return newValue + currentValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class UpdateRanks extends ApplyFunction<Long, Double, Double> {
-
- long numberOfVertices;
-
- public UpdateRanks(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
- @Override
- public void apply(Double rankSum, Double currentValue) {
- setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum);
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static final double DAMPENING_FACTOR = 0.85;
- private static long numPages = 10;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static int maxIterations = 10;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- if(args.length != 3) {
- System.err.println("Usage: GSAPageRank <input edges path> <output path> <num iterations>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- } else {
- System.out.println("Executing GSAPageRank example with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAPageRank <input edges path> <output path> <num iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
- public Edge<Long, Double> map(Tuple2<Long, Long> input) {
- return new Edge<Long, Double>(input.f0, input.f1, 1.0);
- }
- }).withForwardedFields("f0; f1");
- }
-
- return env.generateSequence(1, numPages).flatMap(
- new FlatMapFunction<Long, Edge<Long, Double>>() {
- @Override
- public void flatMap(Long key,
- Collector<Edge<Long, Double>> out) throws Exception {
- int numOutEdges = (int) (Math.random() * (numPages / 2));
- for (int i = 0; i < numOutEdges; i++) {
- long target = (long) (Math.random() * numPages) + 1;
- out.collect(new Edge<Long, Double>(key, target, 1.0));
- }
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Page Rank";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index b01aa23..23a3a82 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -33,7 +33,19 @@ import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
- * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ * This example shows how to use Gelly's Gather-Sum-Apply iterations.
+ *
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
*/
public class GSASingleSourceShortestPaths implements ProgramDescription {
@@ -54,9 +66,8 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
// Execute the GSA iteration
- Graph<Long, Double, Double> result = graph
- .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
- new UpdateDistance(), maxIterations);
+ Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
+ new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -73,6 +84,10 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
}
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
@SuppressWarnings("serial")
private static final class InitVertices implements MapFunction<Long, Double>{
@@ -92,10 +107,6 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
}
}
- // --------------------------------------------------------------------------------------------
- // Single Source Shortest Path UDFs
- // --------------------------------------------------------------------------------------------
-
@SuppressWarnings("serial")
private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index c6a776d..6c4d0c2 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -30,8 +30,9 @@ import org.apache.flink.graph.example.utils.ExampleUtils;
import org.apache.flink.types.NullValue;
/**
+ * This example illustrate how to use Gelly metrics methods and get simple statistics
+ * from the input graph.
*
- * A simple example to illustrate the basic functionality of the graph-api.
* The program creates a random graph and computes and prints
* the following metrics:
* - number of vertices
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index 8d94cbc..cc672b2 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -35,11 +35,11 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
- * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
- * upon edge removal.
- *
* This example illustrates the usage of vertex-centric iteration's
* messaging direction configuration option.
+ *
+ * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
+ * upon edge removal.
*
* The program takes as input the resulted graph after a SSSP computation,
* an edge to be removed and the initial graph(i.e. before SSSP was computed).
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
index 79de407..0f84dbb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -34,6 +34,13 @@ import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
import java.util.HashSet;
/**
+ * This example shows how to use
+ * <ul>
+ * <li> neighborhood methods
+ * <li> join with vertices
+ * <li> triplets
+ * </ul>
+ *
* Given a directed, unweighted graph, return a weighted graph where the edge values are equal
* to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
* of the union of neighbor sets - for the src and target vertices.
@@ -117,8 +124,7 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> {
@Override
- public HashSet<Long> reduceNeighbors(HashSet<Long> first,
- HashSet<Long> second) {
+ public HashSet<Long> reduceNeighbors(HashSet<Long> first, HashSet<Long> second) {
first.addAll(second);
return new HashSet<Long>(first);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
deleted file mode 100644
index bee5af3..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.LabelPropagationAlgorithm;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses the label propagation algorithm to detect communities by
- * propagating labels. Initially, each vertex is assigned its id as its label.
- * The vertices iteratively propagate their labels to their neighbors and adopt
- * the most frequent label among their neighbors. The algorithm converges when
- * no vertex changes value or the maximum number of iterations have been
- * reached.
- *
- * The edges input file is expected to contain one edge per line, with long IDs
- * in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * The vertices input file is expected to contain one vertex per line, with long IDs
- * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
- *
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- */
-public class LabelPropagation implements ProgramDescription {
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- // Set up the execution environment
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // Set up the graph
- DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
- DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
-
- // Set up the program
- DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
- new LabelPropagationAlgorithm<Long>(maxIterations)).getVertices();
-
- // Emit results
- if(fileOutput) {
- verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
-
- // Execute the program
- env.execute("Label Propagation Example");
- } else {
- verticesWithCommunity.print();
- }
-
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String vertexInputPath = null;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static long numVertices = 100;
- private static int maxIterations = 10;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- if(args.length != 4) {
- System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
- return false;
- }
-
- fileOutput = true;
- vertexInputPath = args[0];
- edgeInputPath = args[1];
- outputPath = args[2];
- maxIterations = Integer.parseInt(args[3]);
- } else {
- System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
-
- if (fileOutput) {
- return env.readCsvFile(vertexInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new Tuple2ToVertexMap<Long, Long>());
- }
-
- return env.generateSequence(1, numVertices).map(
- new MapFunction<Long, Vertex<Long, Long>>() {
- public Vertex<Long, Long> map(Long l) throws Exception {
- return new Vertex<Long, Long>(l, l);
- }
- });
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
- @Override
- public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
- return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
- }
- });
- }
-
- return env.generateSequence(1, numVertices).flatMap(
- new FlatMapFunction<Long, Edge<Long, NullValue>>() {
- @Override
- public void flatMap(Long key,
- Collector<Edge<Long, NullValue>> out) {
- int numOutEdges = (int) (Math.random() * (numVertices / 2));
- for (int i = 0; i < numOutEdges; i++) {
- long target = (long) (Math.random() * numVertices) + 1;
- out.collect(new Edge<Long, NullValue>(key, target,
- NullValue.getInstance()));
- }
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "Label Propagation Example";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 0fc45bd..7643976 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -42,32 +42,32 @@ import org.apache.flink.graph.library.LabelPropagationAlgorithm;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
+/**
+ * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
+ * The input is a set <userId - songId - playCount> triplets and
+ * a set of bad records, i.e. song ids that should not be trusted.
+ * Initially, we use the DataSet API to filter out the bad records.
+ * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
+ * the top song (most listened) per user.
+ * Then, we use the DataSet API again, to create a user-user similarity graph,
+ * based on common songs, where users that are listeners of the same song
+ * are connected. A user-defined threshold on the playcount value
+ * defines when a user is considered to be a listener of a song.
+ * Finally, we use the graph API to run the label propagation community detection algorithm on
+ * the similarity graph.
+ *
+ * The triplets input is expected to be given as one triplet per line,
+ * in the following format: "<userID>\t<songID>\t<playcount>".
+ *
+ * The mismatches input file is expected to contain one mismatch record per line,
+ * in the following format:
+ * "ERROR: <songID trackID> song_title"
+ *
+ * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
+ */
@SuppressWarnings("serial")
public class MusicProfiles implements ProgramDescription {
- /**
- * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
- * The input is a set <userId - songId - playCount> triplets and
- * a set of bad records, i.e. song ids that should not be trusted.
- * Initially, we use the DataSet API to filter out the bad records.
- * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
- * the top song (most listened) per user.
- * Then, we use the DataSet API again, to create a user-user similarity graph,
- * based on common songs, where users that are listeners of the same song
- * are connected. A user-defined threshold on the playcount value
- * defines when a user is considered to be a listener of a song.
- * Finally, we use the graph API to run the label propagation community detection algorithm on
- * the similarity graph.
- *
- * The triplets input is expected to be given as one triplet per line,
- * in the following format: "<userID>\t<songID>\t<playcount>".
- *
- * The mismatches input file is expected to contain one mismatch record per line,
- * in the following format:
- * "ERROR: <songID trackID> song_title"
- *
- * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
- */
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
deleted file mode 100644
index 10b4be4..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.PageRankAlgorithm;
-import org.apache.flink.util.Collector;
-
-/**
- * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
- *
- * The edges input file is expected to contain one edge per line, with long IDs and no
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
- *
- * If no arguments are provided, the example runs with a random graph of 10 vertices.
- *
- */
-public class PageRank implements ProgramDescription {
-
- @SuppressWarnings("serial")
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
-
- Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
-
- public Double map(Long value) throws Exception {
- return 1.0;
- }
- }, env);
-
- DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-
- // assign the transition probabilities as the edge weights
- Graph<Long, Double, Double> networkWithWeights = network
- .joinWithEdgesOnSource(vertexOutDegrees,
- new MapFunction<Tuple2<Double, Long>, Double>() {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
- }
- });
-
- DataSet<Vertex<Long, Double>> pageRanks = networkWithWeights.run(
- new PageRankAlgorithm<Long>(DAMPENING_FACTOR, maxIterations))
- .getVertices();
-
- if (fileOutput) {
- pageRanks.writeAsCsv(outputPath, "\n", "\t");
-
- // since file sinks are lazy, we trigger the execution explicitly
- env.execute();
- } else {
- pageRanks.print();
- }
-
- }
-
- @Override
- public String getDescription() {
- return "PageRank example";
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static final double DAMPENING_FACTOR = 0.85;
- private static long numPages = 10;
- private static String edgeInputPath = null;
- private static String outputPath = null;
- private static int maxIterations = 10;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- if(args.length != 3) {
- System.err.println("Usage: PageRank <input edges path> <output path> <num iterations>");
- return false;
- }
-
- fileOutput = true;
- edgeInputPath = args[0];
- outputPath = args[1];
- maxIterations = Integer.parseInt(args[2]);
- } else {
- System.out.println("Executing PageRank example with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: PageRank <input edges path> <output path> <num iterations>");
- }
- return true;
- }
-
- @SuppressWarnings("serial")
- private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-
- if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter("\t")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
- public Edge<Long, Double> map(Tuple2<Long, Long> input) {
- return new Edge<Long, Double>(input.f0, input.f1, 1.0);
- }
- }).withForwardedFields("f0; f1");
- }
-
- return env.generateSequence(1, numPages).flatMap(
- new FlatMapFunction<Long, Edge<Long, Double>>() {
- @Override
- public void flatMap(Long key,
- Collector<Edge<Long, Double>> out) throws Exception {
- int numOutEdges = (int) (Math.random() * (numPages / 2));
- for (int i = 0; i < numOutEdges; i++) {
- long target = (long) (Math.random() * numPages) + 1;
- out.collect(new Edge<Long, Double>(key, target, 1.0));
- }
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
index 9d7d2c2..391ebaf 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -26,23 +26,28 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.library.SingleSourceShortestPathsAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
- * This example implements the Single Source Shortest Paths algorithm,
- * using a vertex-centric iteration.
+ * This example shows how to use Gelly's vertex-centric iterations.
+ *
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a gather-sum-apply implementation of the same algorithm, please refer to {@link GSASingleSourceShortestPaths}.
*
- * The input file is expected to contain one edge per line, with long IDs
- * and double weights, separated by tabs, in the following format:
- * "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
- *
- * If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
*
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
*/
public class SingleSourceShortestPaths implements ProgramDescription {
- @SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (!parseParameters(args)) {
@@ -53,17 +58,14 @@ public class SingleSourceShortestPaths implements ProgramDescription {
DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
- Graph<Long, Double, Double> graph = Graph.fromDataSet(edges,
- new MapFunction<Long, Double>() {
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
- public Double map(Long value) {
- return Double.MAX_VALUE;
- }
- }, env);
+ // Execute the vertex-centric iteration
+ Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
+ new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
- DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
- .run(new SingleSourceShortestPathsAlgorithm<Long>(srcVertexId, maxIterations))
- .getVertices();
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
// emit result
if (fileOutput) {
@@ -77,9 +79,66 @@ public class SingleSourceShortestPaths implements ProgramDescription {
}
- @Override
- public String getDescription() {
- return "Single Source Shortest Paths";
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Double>{
+
+ private long srcId;
+
+ public InitVertices(long srcId) {
+ this.srcId = srcId;
+ }
+
+ public Double map(Long id) {
+ if (id.equals(srcId)) {
+ return 0.0;
+ }
+ else {
+ return Double.POSITIVE_INFINITY;
+ }
+ }
+ }
+
+ /**
+ * Function that updates the value of a vertex by picking the minimum
+ * distance from all incoming messages.
+ */
+ @SuppressWarnings("serial")
+ public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
+
+ Double minDistance = Double.MAX_VALUE;
+
+ for (double msg : inMessages) {
+ if (msg < minDistance) {
+ minDistance = msg;
+ }
+ }
+
+ if (vertex.getValue() > minDistance) {
+ setNewVertexValue(minDistance);
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum distance associated with a given vertex among all
+ * the target vertices summed up with the edge's value.
+ */
+ @SuppressWarnings("serial")
+ public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+ @Override
+ public void sendMessages(Vertex<Long, Double> vertex) {
+ for (Edge<Long, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+ }
+ }
}
// ******************************************************************************************************************
@@ -132,4 +191,9 @@ public class SingleSourceShortestPaths implements ProgramDescription {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
}
+
+ @Override
+ public String getDescription() {
+ return "Vertex-centric Single Source Shortest Paths";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
new file mode 100755
index 0000000..9b75c92
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponents implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+ // Execute the GSA iteration
+ Graph<Long, Long, NullValue> result =
+ graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+ new UpdateComponentId(), maxIterations);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
+
+ // emit result
+ if (fileOutput) {
+ connectedComponents.writeAsCsv(outputPath, "\n", " ");
+
+ // since file sinks are lazy, we trigger the execution explicitly
+ env.execute("GSA Connected Components");
+ } else {
+ connectedComponents.print();
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Long> {
+
+ public Long map(Long vertexId) {
+ return vertexId;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Connected Components UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getNeighborValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+ public Long sum(Long newValue, Long currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+ public void apply(Long summedValue, Long origValue) {
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static int maxIterations = 16;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+
+ if (args.length != 3) {
+ System.err.println("Usage: GSAConnectedComponents <edge path> " +
+ "<result path> <max iterations>");
+ return false;
+ }
+
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
+ } else {
+ System.out.println("Executing GSA Connected Components example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
+ }
+ return true;
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ }
+
+ // Generates 3 components of size 2
+ return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+ @Override
+ public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+ out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Connected Components";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
new file mode 100644
index 0000000..45d4555
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs and no
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 10 vertices
+ * and random edge weights.
+ */
+public class GSAPageRank implements ProgramDescription {
+
+ @SuppressWarnings("serial")
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
+
+ Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
+
+ @Override
+ public Double map(Long value) throws Exception {
+ return 1.0;
+ }
+ }, env);
+
+ DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+
+ // Assign the transition probabilities as the edge weights
+ Graph<Long, Double, Double> networkWithWeights = network
+ .joinWithEdgesOnSource(vertexOutDegrees,
+ new MapFunction<Tuple2<Double, Long>, Double>() {
+
+ @Override
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ });
+
+ long numberOfVertices = networkWithWeights.numberOfVertices();
+
+ // Execute the GSA iteration
+ Graph<Long, Double, Double> result = networkWithWeights
+ .runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+ new UpdateRanks(numberOfVertices), maxIterations);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> pageRanks = result.getVertices();
+
+ // emit result
+ if (fileOutput) {
+ pageRanks.writeAsCsv(outputPath, "\n", "\t");
+
+ // since file sinks are lazy, we trigger the execution explicitly
+ env.execute("GSA Page Ranks");
+ } else {
+ pageRanks.print();
+ }
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Page Rank UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
+
+ long numberOfVertices;
+
+ public GatherRanks(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ @Override
+ public Double gather(Neighbor<Double, Double> neighbor) {
+ double neighborRank = neighbor.getNeighborValue();
+
+ if(getSuperstepNumber() == 1) {
+ neighborRank = 1.0 / numberOfVertices;
+ }
+
+ return neighborRank * neighbor.getEdgeValue();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumRanks extends SumFunction<Double, Double, Double> {
+
+ @Override
+ public Double sum(Double newValue, Double currentValue) {
+ return newValue + currentValue;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class UpdateRanks extends ApplyFunction<Long, Double, Double> {
+
+ long numberOfVertices;
+
+ public UpdateRanks(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ @Override
+ public void apply(Double rankSum, Double currentValue) {
+ setResult((1-DAMPENING_FACTOR)/numberOfVertices + DAMPENING_FACTOR * rankSum);
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static final double DAMPENING_FACTOR = 0.85;
+ private static long numPages = 10;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+ private static int maxIterations = 10;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ if(args.length != 3) {
+ System.err.println("Usage: GSAPageRank <input edges path> <output path> <num iterations>");
+ return false;
+ }
+
+ fileOutput = true;
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
+ } else {
+ System.out.println("Executing GSAPageRank example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSAPageRank <input edges path> <output path> <num iterations>");
+ }
+ return true;
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
+
+ if (fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+ public Edge<Long, Double> map(Tuple2<Long, Long> input) {
+ return new Edge<Long, Double>(input.f0, input.f1, 1.0);
+ }
+ }).withForwardedFields("f0; f1");
+ }
+
+ return env.generateSequence(1, numPages).flatMap(
+ new FlatMapFunction<Long, Edge<Long, Double>>() {
+ @Override
+ public void flatMap(Long key,
+ Collector<Edge<Long, Double>> out) throws Exception {
+ int numOutEdges = (int) (Math.random() * (numPages / 2));
+ for (int i = 0; i < numOutEdges; i++) {
+ long target = (long) (Math.random() * numPages) + 1;
+ out.collect(new Edge<Long, Double>(key, target, 1.0));
+ }
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Page Rank";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/970ab35e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..b01aa23
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+
+ // Execute the GSA iteration
+ Graph<Long, Double, Double> result = graph
+ .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+ new UpdateDistance(), maxIterations);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+ // emit result
+ if(fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+
+ // since file sinks are lazy, we trigger the execution explicitly
+ env.execute("GSA Single Source Shortest Paths");
+ } else {
+ singleSourceShortestPaths.print();
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Double>{
+
+ private long srcId;
+
+ public InitVertices(long srcId) {
+ this.srcId = srcId;
+ }
+
+ public Double map(Long id) {
+ if (id.equals(srcId)) {
+ return 0.0;
+ }
+ else {
+ return Double.POSITIVE_INFINITY;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+ public Double gather(Neighbor<Double, Double> neighbor) {
+ return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+ public Double sum(Double newValue, Double currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
+
+ public void apply(Double newDistance, Double oldDistance) {
+ if (newDistance < oldDistance) {
+ setResult(newDistance);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+
+ private static Long srcVertexId = 1l;
+
+ private static String edgesInputPath = null;
+
+ private static String outputPath = null;
+
+ private static int maxIterations = 5;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ return false;
+ }
+
+ fileOutput = true;
+ srcVertexId = Long.parseLong(args[0]);
+ edgesInputPath = args[1];
+ outputPath = args[2];
+ maxIterations = Integer.parseInt(args[3]);
+ } else {
+ System.out.println("Executing GSASingle Source Shortest Paths example "
+ + "with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new Tuple3ToEdgeMap<Long, Double>());
+ } else {
+ return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Single Source Shortest Paths";
+ }
+}
[4/4] flink git commit: [gelly] made the number of vertices an
optional parameter of PageRank;
added the edge weight initialization to the library methods
Posted by va...@apache.org.
[gelly] made the number of vertices an optional parameter of PageRank; added the edge weight initialization to the library methods
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ae84273
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ae84273
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ae84273
Branch: refs/heads/master
Commit: 5ae84273ce60899ba118b8d21b40587a71515f9b
Parents: 8f35988
Author: vasia <va...@apache.org>
Authored: Sat Aug 8 13:36:47 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Aug 22 20:47:00 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Graph.java | 1 -
.../apache/flink/graph/library/GSAPageRank.java | 37 ++++++++++++++-
.../apache/flink/graph/library/PageRank.java | 44 +++++++++++++++++-
.../graph/test/library/PageRankITCase.java | 47 +++++++++++---------
4 files changed, 104 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 8552c01..ff0ec24 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1145,7 +1145,6 @@ public class Graph<K, VV, EV> {
* @return the new graph containing the existing vertices and edges plus the
* newly added edge
*/
- @SuppressWarnings("unchecked")
public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 4299381..4adaaa9 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -18,6 +18,9 @@
package org.apache.flink.graph.library;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.gsa.ApplyFunction;
@@ -27,23 +30,46 @@ import org.apache.flink.graph.gsa.SumFunction;
/**
* This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
+ *
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
private double beta;
private int maxIterations;
+ private long numberOfVertices;
+ /**
+ * @param beta the damping factor
+ * @param maxIterations the maximum number of iterations
+ */
public GSAPageRank(double beta, int maxIterations) {
this.beta = beta;
this.maxIterations = maxIterations;
}
+ public GSAPageRank(double beta, long numVertices, int maxIterations) {
+ this.beta = beta;
+ this.numberOfVertices = numVertices;
+ this.maxIterations = maxIterations;
+ }
+
@Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
- final long numberOfVertices = network.numberOfVertices();
+ if (numberOfVertices == 0) {
+ numberOfVertices = network.numberOfVertices();
+ }
+
+ DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
- return network.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+ Graph<K, Double, Double> networkWithWeights = network
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
new UpdateRanks<K>(beta, numberOfVertices), maxIterations);
}
@@ -97,4 +123,11 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double> {
setResult((1-beta)/numVertices + beta * rankSum);
}
}
+
+ @SuppressWarnings("serial")
+ private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 03cb740..93b10eb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -18,6 +18,9 @@
package org.apache.flink.graph.library;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
@@ -28,23 +31,52 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
/**
* This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
+ *
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
private double beta;
private int maxIterations;
+ private long numberOfVertices;
+ /**
+ * @param beta the damping factor
+ * @param maxIterations the maximum number of iterations
+ */
public PageRank(double beta, int maxIterations) {
this.beta = beta;
this.maxIterations = maxIterations;
+ this.numberOfVertices = 0;
+ }
+
+ /**
+ * @param beta the damping factor
+ * @param maxIterations the maximum number of iterations
+ * @param numVertices the number of vertices in the input
+ */
+ public PageRank(double beta, long numVertices, int maxIterations) {
+ this.beta = beta;
+ this.maxIterations = maxIterations;
+ this.numberOfVertices = numVertices;
}
@Override
public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
- final long numberOfVertices = network.numberOfVertices();
+ if (numberOfVertices == 0) {
+ numberOfVertices = network.numberOfVertices();
+ }
+
+ DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
- return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
+ Graph<K, Double, Double> networkWithWeights = network
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
new RankMessenger<K>(numberOfVertices), maxIterations);
}
@@ -102,4 +134,12 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double> {
}
}
}
+
+ @SuppressWarnings("serial")
+ private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ae84273/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
index cc1132d..cc0327f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -22,9 +22,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.PageRankData;
@@ -51,13 +49,8 @@ public class PageRankITCase extends MultipleProgramsTestBase {
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
- DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
- Graph<Long, Double, Double> networkWithWeights = inputGraph
- .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
-
- List<Vertex<Long, Double>> result = networkWithWeights.run(new PageRank<Long>(0.85, 3))
+ List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
.getVertices().collect();
compareWithDelta(result, expectedResult, 0.01);
@@ -69,13 +62,34 @@ public class PageRankITCase extends MultipleProgramsTestBase {
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
- DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
- Graph<Long, Double, Double> networkWithWeights = inputGraph
- .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+ List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
+ .getVertices().collect();
+
+ compareWithDelta(result, expectedResult, 0.01);
+ }
+
+ @Test
+ public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+ List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
+ .getVertices().collect();
+
+ compareWithDelta(result, expectedResult, 0.01);
+ }
+
+ @Test
+ public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- List<Vertex<Long, Double>> result = networkWithWeights.run(new GSAPageRank<Long>(0.85, 3))
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+ List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
.getVertices().collect();
compareWithDelta(result, expectedResult, 0.01);
@@ -115,11 +129,4 @@ public class PageRankITCase extends MultipleProgramsTestBase {
return 1.0;
}
}
-
- @SuppressWarnings("serial")
- private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
- public Double map(Tuple2<Double, Long> value) {
- return value.f0 / value.f1;
- }
- }
}
[3/4] flink git commit: [FLINK-2451] [gelly] re-organized tests;
compare with collect() instead of temp files where possible
Posted by va...@apache.org.
[FLINK-2451] [gelly] re-organized tests; compare with collect() instead of temp files where possible
This closes #1000
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f35988f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f35988f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f35988f
Branch: refs/heads/master
Commit: 8f35988fc3edfeda8044a6675d04f111596b8e31
Parents: d2d061c
Author: vasia <va...@apache.org>
Authored: Fri Aug 7 11:28:20 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Aug 22 20:46:49 2015 +0200
----------------------------------------------------------------------
.../example/GSASingleSourceShortestPaths.java | 2 +-
.../example/utils/CommunityDetectionData.java | 30 ++++
.../example/utils/LabelPropagationData.java | 110 ++++++++++++++
.../flink/graph/example/utils/PageRankData.java | 33 ++++-
.../flink/graph/test/GatherSumApplyITCase.java | 111 +++++++-------
.../test/example/CommunityDetectionITCase.java | 100 -------------
...ctedComponentsWithRandomisedEdgesITCase.java | 95 ------------
.../test/example/LabelPropagationITCase.java | 143 -------------------
.../graph/test/example/PageRankITCase.java | 78 ----------
.../SingleSourceShortestPathsITCase.java | 9 ++
.../test/library/CommunityDetectionITCase.java | 82 +++++++++++
...ctedComponentsWithRandomisedEdgesITCase.java | 95 ++++++++++++
.../test/library/LabelPropagationITCase.java | 78 ++++++++++
.../graph/test/library/PageRankITCase.java | 125 ++++++++++++++++
14 files changed, 611 insertions(+), 480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index 23a3a82..9ea8fe2 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -74,7 +74,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
// emit result
if(fileOutput) {
- singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("GSA Single Source Shortest Paths");
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
index 629f5ef..196de3a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -36,6 +36,11 @@ public class CommunityDetectionData {
public static final double DELTA = 0.5f;
+ public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + "2,6\n"
+ + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
+
+ public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
@@ -61,5 +66,30 @@ public class CommunityDetectionData {
return env.fromCollection(edges);
}
+ public static DataSet<Edge<Long, Double>> getSimpleEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+ edges.add(new Edge<Long, Double>(1L, 5L, 4.0));
+ edges.add(new Edge<Long, Double>(2L, 6L, 5.0));
+ edges.add(new Edge<Long, Double>(6L, 7L, 6.0));
+ edges.add(new Edge<Long, Double>(6L, 8L, 7.0));
+ edges.add(new Edge<Long, Double>(7L, 8L, 8.0));
+
+ return env.fromCollection(edges);
+ }
+
private CommunityDetectionData() {}
+
+ public static DataSet<Edge<Long, Double>> getTieEdgeDataSet(ExecutionEnvironment env) {
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 5L, 1.0));
+
+ return env.fromCollection(edges);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
new file mode 100644
index 0000000..b70a9c4
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+public class LabelPropagationData {
+
+ public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,40\n" +
+ "5,40\n" +
+ "6,40\n" +
+ "7,40\n";
+
+ public static final String LABELS_WITH_TIE ="1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,10\n" +
+ "5,20\n" +
+ "6,20\n" +
+ "7,20\n" +
+ "8,20\n" +
+ "9,20\n";
+
+ private LabelPropagationData() {}
+
+ public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ vertices.add(new Vertex<Long, Long>(1l, 10l));
+ vertices.add(new Vertex<Long, Long>(2l, 10l));
+ vertices.add(new Vertex<Long, Long>(3l, 30l));
+ vertices.add(new Vertex<Long, Long>(4l, 40l));
+ vertices.add(new Vertex<Long, Long>(5l, 40l));
+ vertices.add(new Vertex<Long, Long>(6l, 40l));
+ vertices.add(new Vertex<Long, Long>(7l, 40l));
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+ edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance()));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ vertices.add(new Vertex<Long, Long>(1l, 10l));
+ vertices.add(new Vertex<Long, Long>(2l, 10l));
+ vertices.add(new Vertex<Long, Long>(3l, 10l));
+ vertices.add(new Vertex<Long, Long>(4l, 10l));
+ vertices.add(new Vertex<Long, Long>(5l, 0l));
+ vertices.add(new Vertex<Long, Long>(6l, 20l));
+ vertices.add(new Vertex<Long, Long>(7l, 20l));
+ vertices.add(new Vertex<Long, Long>(8l, 20l));
+ vertices.add(new Vertex<Long, Long>(9l, 20l));
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+ edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance()));
+
+ return env.fromCollection(edges);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
index 077572e..c84808a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -18,6 +18,13 @@
package org.apache.flink.graph.example.utils;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
public class PageRankData {
public static final String EDGES = "2 1\n" +
@@ -31,12 +38,28 @@ public class PageRankData {
"3 5\n";
- public static final String RANKS_AFTER_3_ITERATIONS = "1 0.237\n" +
- "2 0.248\n" +
- "3 0.173\n" +
- "4 0.175\n" +
- "5 0.165\n";
+ public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
+ "2,0.248\n" +
+ "3,0.173\n" +
+ "4,0.175\n" +
+ "5,0.165\n";
private PageRankData() {}
+
+ public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
+ edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
+ edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
+ edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+ edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
+
+ return env.fromCollection(edges);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index a883fa0..4b381b6 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -18,20 +18,21 @@
package org.apache.flink.graph.test;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAConnectedComponents;
-import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.flink.types.NullValue;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.File;
+import java.util.List;
@RunWith(Parameterized.class)
public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@@ -40,44 +41,29 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String edgesPath;
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
-
- File edgesFile = tempFolder.newFile();
- Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
-
- edgesPath = edgesFile.toURI().toString();
-
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
// --------------------------------------------------------------------------------------------
// Connected Components Test
// --------------------------------------------------------------------------------------------
@Test
public void testConnectedComponents() throws Exception {
- GSAConnectedComponents.main(new String[]{edgesPath, resultPath, "16"});
- expectedResult = "1 1\n" +
- "2 1\n" +
- "3 1\n" +
- "4 1\n" +
- "5 1\n" +
- "6 6\n" +
- "7 6\n";
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+ new InitMapperCC(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new GSAConnectedComponents(16))
+ .getVertices().collect();
+ expectedResult = "1,1\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,1\n";
+
+ compareResultAsTuples(result, expectedResult);
}
// --------------------------------------------------------------------------------------------
@@ -86,26 +72,35 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@Test
public void testSingleSourceShortestPaths() throws Exception {
- GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"});
- expectedResult = "1 0.0\n" +
- "2 12.0\n" +
- "3 13.0\n" +
- "4 47.0\n" +
- "5 48.0\n" +
- "6 Infinity\n" +
- "7 Infinity\n";
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+ new InitMapperSSSP(), env);
+
+ List<Vertex<Long, Double>> result = inputGraph.run(new GSASingleSourceShortestPaths<Long>(1l, 16))
+ .getVertices().collect();
+
+ expectedResult = "1,0.0\n" +
+ "2,12.0\n" +
+ "3,13.0\n" +
+ "4,47.0\n" +
+ "5,48.0\n";
+
+ compareResultAsTuples(result, expectedResult);
}
- // --------------------------------------------------------------------------------------------
- // Sample data
- // --------------------------------------------------------------------------------------------
+ @SuppressWarnings("serial")
+ private static final class InitMapperCC implements MapFunction<Long, Long> {
+ public Long map(Long value) {
+ return value;
+ }
+ }
- private static final String EDGES = "1 2 12.0\n" +
- "1 3 13.0\n" +
- "2 3 23.0\n" +
- "3 4 34.0\n" +
- "3 5 35.0\n" +
- "4 5 45.0\n" +
- "5 1 51.0\n" +
- "6 7 67.0\n";
-}
+ @SuppressWarnings("serial")
+ private static final class InitMapperSSSP implements MapFunction<Long, Double> {
+ public Double map(Long value) {
+ return 0.0;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
deleted file mode 100644
index 1302424..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.CommunityDetection;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class CommunityDetectionITCase extends MultipleProgramsTestBase {
-
- private String edgesPath;
-
- private String resultPath;
-
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- public CommunityDetectionITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
- @Test
- public void testSingleIteration() throws Exception {
- /*
- * Test one iteration of the Simple Community Detection Example
- */
- final String edges = "1 2 1.0\n" + "1 3 2.0\n" + "1 4 3.0\n" + "1 5 4.0\n" + "2 6 5.0\n" +
- "6 7 6.0\n" + "6 8 7.0\n" + "7 8 8.0";
- edgesPath = createTempFile(edges);
-
- CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
- CommunityDetectionData.DELTA + ""});
-
- expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
- }
-
- @Test
- public void testTieBreaker() throws Exception {
- /*
- * Test one iteration of the Simple Community Detection Example where a tie must be broken
- */
-
- final String edges = "1 2 1.0\n" + "1 3 1.0\n" + "1 4 1.0\n" + "1 5 1.0";
- edgesPath = createTempFile(edges);
-
- CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
- CommunityDetectionData.DELTA + ""});
-
- expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
- }
-
-
- // -------------------------------------------------------------------------
- // Util methods
- // -------------------------------------------------------------------------
- private String createTempFile(final String rows) throws Exception {
- File tempFile = tempFolder.newFile();
- Files.write(rows, tempFile, Charsets.UTF_8);
- return tempFile.toURI().toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
deleted file mode 100644
index f2f3d8c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-
-import java.io.BufferedReader;
-
-@SuppressWarnings("serial")
-public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
-
- private static final long SEED = 9487520347802987L;
-
- private static final int NUM_VERTICES = 1000;
-
- private static final int NUM_EDGES = 10000;
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempFilePath("results");
- }
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
- DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
-
- DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
-
- DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- DataSet<Vertex<Long, Long>> result = graph
- .run(new ConnectedComponentsAlgorithm(100)).getVertices();
-
- result.writeAsCsv(resultPath, "\n", " ");
- env.execute();
- }
-
- /**
- * A map function that takes a Long value and creates a 2-tuple out of it:
- * <pre>(Long value) -> (value, value)</pre>
- */
- public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
- @Override
- public Vertex<Long, Long> map(Long value) {
- return new Vertex<Long, Long>(value, value);
- }
- }
-
- @Override
- protected void postSubmit() throws Exception {
- for (BufferedReader reader : getResultReader(resultPath)) {
- ConnectedComponentsData.checkOddEvenResult(reader);
- }
- }
-
- public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
- public Edge<Long, NullValue> map(String value) {
- String[] nums = value.split(" ");
- return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
- NullValue.getInstance());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
deleted file mode 100644
index 858d06c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.LabelPropagation;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationITCase extends MultipleProgramsTestBase {
-
- public LabelPropagationITCase(TestExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testSingleIteration() throws Exception {
- /*
- * Test one iteration of label propagation example with a simple graph
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 30\n" +
- "4 40\n" +
- "5 40\n" +
- "6 40\n" +
- "7 70\n";
-
- final String edges = "1 3\n" +
- "2 3\n" +
- "4 7\n" +
- "5 7\n" +
- "6 7\n" +
- "7 3\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,40\n" +
- "5,40\n" +
- "6,40\n" +
- "7,40\n";
- }
-
- @Test
- public void testTieBreaker() throws Exception {
- /*
- * Test the label propagation example where a tie must be broken
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 10\n" +
- "4 10\n" +
- "5 0\n" +
- "6 20\n" +
- "7 20\n" +
- "8 20\n" +
- "9 20\n";
-
- final String edges = "1 5\n" +
- "2 5\n" +
- "3 5\n" +
- "4 5\n" +
- "6 5\n" +
- "7 5\n" +
- "8 5\n" +
- "9 5\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,10\n" +
- "5,20\n" +
- "6,20\n" +
- "7,20\n" +
- "8,20\n" +
- "9,20\n";
- }
-
- // -------------------------------------------------------------------------
- // Util methods
- // -------------------------------------------------------------------------
-
- private String createTempFile(final String rows) throws Exception {
- File tempFile = tempFolder.newFile();
- Files.write(rows, tempFile, Charsets.UTF_8);
- return tempFile.toURI().toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
deleted file mode 100644
index cde959f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import java.io.File;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.GSAPageRank;
-import org.apache.flink.graph.example.PageRank;
-import org.apache.flink.graph.example.utils.PageRankData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
- public PageRankITCase(TestExecutionMode mode){
- super(mode);
- }
-
- private String edgesPath;
- private String resultPath;
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
-
- File edgesFile = tempFolder.newFile();
- Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
-
- edgesPath = edgesFile.toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareKeyValuePairsWithDelta(expected, resultPath, "\t", 0.01);
- }
-
- @Test
- public void testPageRankWithThreeIterations() throws Exception {
- PageRank.main(new String[] {edgesPath, resultPath, "3"});
- expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
- }
-
- @Test
- public void testGSAPageRankWithThreeIterations() throws Exception {
- GSAPageRank.main(new String[] {edgesPath, resultPath, "3"});
- expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
index 2e68b0a..d8f8c8f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -20,6 +20,8 @@ package org.apache.flink.graph.test.example;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
import org.apache.flink.graph.example.SingleSourceShortestPaths;
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -65,6 +67,13 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
}
+ @Test
+ public void testGSASSSPExample() throws Exception {
+ GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+ edgesPath, resultPath, 10 + ""});
+ expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+ }
+
@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected, resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
new file mode 100644
index 0000000..104996e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetection;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+ public CommunityDetectionITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private String expected;
+
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of the Simple Community Detection Example
+ */
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+ CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+ .getVertices().collect();
+
+ expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test one iteration of the Simple Community Detection Example where a tie must be broken
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+ CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+ .getVertices().collect();
+ expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
+ compareResultAsTuples(result, expected);
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitLabels implements MapFunction<Long, Long>{
+
+ public Long map(Long id) {
+ return id;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
new file mode 100644
index 0000000..ef4b467
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+import java.io.BufferedReader;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
+
+ private static final long SEED = 9487520347802987L;
+
+ private static final int NUM_VERTICES = 1000;
+
+ private static final int NUM_EDGES = 10000;
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempFilePath("results");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+ DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+
+ DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
+
+ DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ DataSet<Vertex<Long, Long>> result = graph
+ .run(new ConnectedComponents(100)).getVertices();
+
+ result.writeAsCsv(resultPath, "\n", " ");
+ env.execute();
+ }
+
+ /**
+ * A map function that takes a Long value and creates a 2-tuple out of it:
+ * <pre>(Long value) -> (value, value)</pre>
+ */
+ public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
+ @Override
+ public Vertex<Long, Long> map(Long value) {
+ return new Vertex<Long, Long>(value, value);
+ }
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for (BufferedReader reader : getResultReader(resultPath)) {
+ ConnectedComponentsData.checkOddEvenResult(reader);
+ }
+ }
+
+ public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
+ public Edge<Long, NullValue> map(String value) {
+ String[] nums = value.split(" ");
+ return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
+ NullValue.getInstance());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
new file mode 100644
index 0000000..da36ef6
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.LabelPropagationData;
+import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+ public LabelPropagationITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of label propagation example with a simple graph
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ LabelPropagationData.getDefaultVertexSet(env),
+ LabelPropagationData.getDefaultEdgeDataSet(env), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
+ .getVertices().collect();
+
+ expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test the label propagation example where a tie must be broken
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ LabelPropagationData.getTieVertexSet(env),
+ LabelPropagationData.getTieEdgeDataSet(env), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
+ .getVertices().collect();
+
+ expectedResult = LabelPropagationData.LABELS_WITH_TIE;
+ compareResultAsTuples(result, expectedResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
new file mode 100644
index 0000000..cc1132d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.PageRankData;
+import org.apache.flink.graph.library.GSAPageRank;
+import org.apache.flink.graph.library.PageRank;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+ public PageRankITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ @Test
+ public void testPageRankWithThreeIterations() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+ DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
+
+ Graph<Long, Double, Double> networkWithWeights = inputGraph
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ List<Vertex<Long, Double>> result = networkWithWeights.run(new PageRank<Long>(0.85, 3))
+ .getVertices().collect();
+
+ compareWithDelta(result, expectedResult, 0.01);
+ }
+
+ @Test
+ public void testGSAPageRankWithThreeIterations() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+ DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
+
+ Graph<Long, Double, Double> networkWithWeights = inputGraph
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ List<Vertex<Long, Double>> result = networkWithWeights.run(new GSAPageRank<Long>(0.85, 3))
+ .getVertices().collect();
+
+ compareWithDelta(result, expectedResult, 0.01);
+ }
+
+ private void compareWithDelta(List<Vertex<Long, Double>> result,
+ String expectedResult, double delta) {
+
+ String resultString = "";
+ for (Vertex<Long, Double> v : result) {
+ resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
+ }
+
+ expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+ String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
+
+ String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
+
+ Arrays.sort(expected);
+ Arrays.sort(resultArray);
+
+ for (int i = 0; i < expected.length; i++) {
+ String[] expectedFields = expected[i].split(",");
+ String[] resultFields = resultArray[i].split(",");
+
+ double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+ double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+ Assert.assertTrue("Values differ by more than the permissible delta",
+ Math.abs(expectedPayLoad - resultPayLoad) < delta);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitMapper implements MapFunction<Long, Double> {
+ public Double map(Long value) {
+ return 1.0;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ }
+}