You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/05/19 23:03:31 UTC
[02/10] flink git commit: [FLINK-1523] [gelly] Vertex centric
iteration extensions
[FLINK-1523] [gelly] Vertex centric iteration extensions
Removed trailing splits into vertex key and value
Made the extensions optional and separate
Removed overhead when the options are not used
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/585d27d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/585d27d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/585d27d0
Branch: refs/heads/master
Commit: 585d27d0d7c882797f813a25d21485187b546145
Parents: 8f4039d
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 16 14:04:23 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:03 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 62 +++
.../main/java/org/apache/flink/graph/Graph.java | 4 +-
.../flink/graph/IterationConfiguration.java | 1 -
.../java/org/apache/flink/graph/Vertex.java | 26 +-
.../flink/graph/example/CommunityDetection.java | 4 +-
.../graph/example/IncrementalSSSPExample.java | 304 +++++++++++
.../example/utils/IncrementalSSSPData.java | 91 ++++
.../library/CommunityDetectionAlgorithm.java | 13 +-
.../library/LabelPropagationAlgorithm.java | 9 +-
.../flink/graph/library/PageRankAlgorithm.java | 13 +-
.../SingleSourceShortestPathsAlgorithm.java | 10 +-
.../flink/graph/spargel/MessagingFunction.java | 75 ++-
.../spargel/VertexCentricConfiguration.java | 38 ++
.../graph/spargel/VertexCentricIteration.java | 528 +++++++++++++++----
.../graph/spargel/VertexUpdateFunction.java | 89 +++-
.../test/CollectionModeSuperstepITCase.java | 13 +-
.../test/VertexCentricConfigurationITCase.java | 16 +-
...ctedComponentsWithRandomisedEdgesITCase.java | 1 -
.../test/example/IncrementalSSSPITCase.java | 84 +++
19 files changed, 1211 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index abd489c..9089e09 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -438,6 +438,68 @@ public static final class Messenger extends MessagingFunction {...}
{% endhighlight %}
+### Vertex-Centric Iteration Extensions
+A vertex-centric iteration can be extended with information such as the total number of vertices,
+the in degree and out degree. Additionally, the neighborhood type (in/out/all) over which to
+run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used
+to modify the current vertex's state and messages are sent to out-neighbors.
+
+In order to activate these options, the following parameters must be set to true:
+
+<strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property
+can be set using the `setOptNumVertices()` method.
+
+The number of vertices can then be accessed in the vertex update function and in the messaging function
+using the `getNumberOfVertices()` method.
+
+<strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set
+using the `setOptDegrees()` method.
+
+The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex
+using `vertex.getInDegree()` or `vertex.getOutDegree()`.
+
+<strong>Messaging Direction</strong>: The direction in which messages are sent. This can be either EdgeDirection.IN,
+EdgeDirection.OUT, EdgeDirection.ALL. The messaging direction also dictates the update direction which would be
+EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, respectively. This property can be set using the
+`setDirection()` method.
+
+{% highlight java %}
+Graph<Long, Double, Double> graph = ...
+
+// create the vertex-centric iteration
+VertexCentricIteration<Long, Double, Double, Double> iteration =
+ graph.createVertexCentricIteration(
+ new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
+
+// set the messaging direction
+iteration.setDirection(EdgeDirection.IN);
+
+// set the number of vertices option to true
+iteration.setOptNumVertices(true);
+
+// set the degree option to true
+iteration.setOptDegrees(true);
+
+// run the computation
+graph.runVertexCentricIteration(iteration);
+
+// user-defined functions
+public static final class VertexDistanceUpdater {
+ ...
+ // get the number of vertices
+ long numVertices = getNumberOfVertices();
+ ...
+}
+
+public static final class MinDistanceMessenger {
+ ...
+ // decrement the number of out-degrees
+ outDegree = vertex.getOutDegree() - 1;
+ ...
+}
+
+{% endhighlight %}
+
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 6e5b8f1..6b632bc 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1143,7 +1143,7 @@ public class Graph<K, VV, EV> {
/**
* Runs a Vertex-Centric iteration on the graph.
* No configuration options are provided.
-
+ *
* @param vertexUpdateFunction the vertex update function
* @param messagingFunction the messaging function
* @param maximumNumberOfIterations maximum number of iterations to perform
@@ -1181,7 +1181,7 @@ public class Graph<K, VV, EV> {
iteration.configure(parameters);
- DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+ DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index e1d4a1e..3086172 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
-
import com.google.common.base.Preconditions;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
index c5eb973..985ea45 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
@@ -31,11 +31,19 @@ public class Vertex<K, V> extends Tuple2<K, V> {
private static final long serialVersionUID = 1L;
- public Vertex(){}
+ private Long inDegree;
+ private Long outDegree;
+
+ public Vertex(){
+ inDegree = 0L;
+ outDegree = 0L;
+ }
public Vertex(K k, V val) {
this.f0 = k;
this.f1 = val;
+ inDegree = 0L;
+ outDegree = 0L;
}
public K getId() {
@@ -53,4 +61,20 @@ public class Vertex<K, V> extends Tuple2<K, V> {
public void setValue(V val) {
this.f1 = val;
}
+
+ public Long getInDegree() {
+ return inDegree;
+ }
+
+ public void setInDegree(Long inDegree) {
+ this.inDegree = inDegree;
+ }
+
+ public Long getOutDegree() {
+ return outDegree;
+ }
+
+ public void setOutDegree(Long outDegree) {
+ this.outDegree = outDegree;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
index f9434d3..30005e7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
@@ -64,8 +64,8 @@ public class CommunityDetection implements ProgramDescription {
DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
new MapFunction<Long, Long>() {
- @Override
- public Long map(Long label) throws Exception {
+
+ public Long map(Long label) {
return label;
}
}, env);
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
new file mode 100644
index 0000000..b06425d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was computed).
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge <u, v> is removed, v checks if it has another out-going SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message.
+ * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE message.
+ * The propagation stops when a vertex with an alternative shortest path is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage <code>IncrementalSSSPExample <vertex path> <edge path> <edges in SSSP>
+ * <edge to be removed> <result path> <number of iterations></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings("serial")
+public class IncrementalSSSPExample implements ProgramDescription {
+
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env);
+
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+ DataSet<Edge<Long, Double>> edgesInSSSP = getEdgesinSSSPDataSet(env);
+
+ Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // Assumption: all minimum weight paths are kept
+ Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
+
+ // remove the edge
+ graph.removeEdge(edgeToBeRemoved);
+
+ // configure the iteration
+ IterationConfiguration parameters = new IterationConfiguration();
+
+ if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+ parameters.setDirection(EdgeDirection.IN);
+ parameters.setOptDegrees(true);
+
+ // run the vertex centric iteration to propagate info
+ Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(),
+ new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+
+ DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
+
+ // Emit results
+ if(fileOutput) {
+ resultedVertices.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ resultedVertices.print();
+ }
+
+ env.execute("Incremental SSSP Example");
+ } else {
+ // print the vertices
+ if(fileOutput) {
+ vertices.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ vertices.print();
+ }
+
+ env.execute("Incremental SSSP Example");
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "Incremental Single Sink Shortest Paths Example";
+ }
+
+ // ******************************************************************************************************************
+ // IncrementalSSSP METHODS
+ // ******************************************************************************************************************
+
+ /**
+ * Function that verifies whether the edge to be removed is part of the SSSP or not.
+ * If it is, the src vertex will be invalidated.
+ *
+ * @param edgeToBeRemoved
+ * @param edgesInSSSP
+ * @return
+ */
+ private static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
+
+ return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() {
+ @Override
+ public boolean filter(Edge<Long, Double> edge) throws Exception {
+ return edge.equals(edgeToBeRemoved);
+ }
+ }).count() > 0;
+ }
+
+ public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+ @Override
+ public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
+ if (inMessages.hasNext()) {
+ Long outDegree = vertex.getOutDegree() - 1;
+ // check if the vertex has another SP-Edge
+ if (outDegree > 0) {
+ // there is another shortest path from the source to this vertex
+ } else {
+ // set own value to infinity
+ setNewVertexValue(Double.MAX_VALUE);
+ }
+ }
+ }
+ }
+
+ public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+ private Edge<Long, Double> edgeToBeRemoved;
+
+ public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
+ this.edgeToBeRemoved = edgeToBeRemoved;
+ }
+
+ @Override
+ public void sendMessages(Vertex<Long, Double> vertex) throws Exception {
+
+
+ if(getSuperstepNumber() == 1) {
+ if(vertex.getId().equals(edgeToBeRemoved.getSource())) {
+ // activate the edge target
+ sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
+ }
+ }
+
+ if(getSuperstepNumber() > 1) {
+ // invalidate all edges
+ for(Edge<Long, Double> edge : getEdges()) {
+ sendMessageTo(edge.getSource(), Double.MAX_VALUE);
+ }
+ }
+ }
+ }
+
+ // ******************************************************************************************************************
+ // UTIL METHODS
+ // ******************************************************************************************************************
+
+ private static boolean fileOutput = false;
+
+ private static String verticesInputPath = null;
+
+ private static String edgesInputPath = null;
+
+ private static String edgesInSSSPInputPath = null;
+
+ private static String edgeToBeRemoved = null;
+
+ private static String outputPath = null;
+
+ private static int maxIterations = 5;
+
+ private static boolean parseParameters(String[] args) {
+ if (args.length > 0) {
+ if (args.length == 6) {
+ fileOutput = true;
+ verticesInputPath = args[0];
+ edgesInputPath = args[1];
+ edgesInSSSPInputPath = args[2];
+ edgeToBeRemoved = args[3];
+ outputPath = args[4];
+ maxIterations = Integer.parseInt(args[5]);
+ } else {
+ System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("See the documentation for the correct format of input files.");
+ System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> <output path> <max iterations>");
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(verticesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class)
+ .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+
+ @Override
+ public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception {
+ return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
+ }
+ });
+ } else {
+ System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+ "<output path> <max iterations>");
+ return IncrementalSSSPData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+
+ @Override
+ public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
+ return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+ }
+ });
+ } else {
+ System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+ "<output path> <max iterations>");
+ return IncrementalSSSPData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgesinSSSPDataSet(ExecutionEnvironment env) {
+ if (fileOutput) {
+ return env.readCsvFile(edgesInSSSPInputPath)
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+
+ @Override
+ public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
+ return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+ }
+ });
+ } else {
+ System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+ "<output path> <max iterations>");
+ return IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+ }
+ }
+
+ private static Edge<Long, Double> getEdgeToBeRemoved() {
+ if (fileOutput) {
+ String [] edgeComponents = edgeToBeRemoved.split(",");
+
+ return new Edge<Long, Double>(Long.parseLong(edgeComponents[0]), Long.parseLong(edgeComponents[1]),
+ Double.parseDouble(edgeComponents[2]));
+ } else {
+ System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+ "<output path> <max iterations>");
+ return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
new file mode 100644
index 0000000..8c38d56
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the IncrementalSSSP example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class IncrementalSSSPData {
+
+ public static final int NUM_VERTICES = 5;
+
+ public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5, 0.0";
+
+ public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+ vertices.add(new Vertex<Long, Double>(1L, 6.0));
+ vertices.add(new Vertex<Long, Double>(2L, 2.0));
+ vertices.add(new Vertex<Long, Double>(3L, 3.0));
+ vertices.add(new Vertex<Long, Double>(4L, 1.0));
+ vertices.add(new Vertex<Long, Double>(5L, 0.0));
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" +
+ "4,5,1.0";
+
+ public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+ edges.add(new Edge<Long, Double>(2L, 4L, 3.0));
+ edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+ edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(3L, 5L, 5.0));
+ edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "4,5,1.0";
+
+ public static final DataSet<Edge<Long, Double>> getDefaultEdgesInSSSP(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+ edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+ edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final String EDGE_TO_BE_REMOVED = "2,5,2.0";
+
+ public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
+
+ return new Edge<Long, Double>(2L, 5L, 2.0);
+ }
+
+ public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE + "\n" + "2," + Double.MAX_VALUE+ "\n"
+ + "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0";
+
+ private IncrementalSSSPData() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
index 47e3595..6f72deb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
@@ -79,7 +79,7 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
}
@Override
- public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore,
+ public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex,
MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
// we would like these two maps to be ordered
@@ -116,7 +116,7 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
if(receivedLabelsWithScores.size() > 0) {
// find the label with the highest score from the ones received
Double maxScore = -Double.MAX_VALUE;
- Long maxScoreLabel = labelScore.f0;
+ Long maxScoreLabel = vertex.getValue().f0;
for (Long curLabel : receivedLabelsWithScores.keySet()) {
if (receivedLabelsWithScores.get(curLabel) > maxScore) {
@@ -128,7 +128,7 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
// find the highest score of maxScoreLabel
Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
// re-score the new label
- if (maxScoreLabel != labelScore.f0) {
+ if (maxScoreLabel != vertex.getValue().f0) {
highestScore -= delta / getSuperstepNumber();
}
// else delta = 0
@@ -143,10 +143,11 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
Tuple2<Long, Double>, Double> {
@Override
- public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception {
+ public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
- for(Edge<Long, Double> edge : getOutgoingEdges()) {
- sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
+ for(Edge<Long, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
+ vertex.getValue().f1 * edge.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
index 7fc0614..0b0f4fc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -66,12 +67,12 @@ public class LabelPropagationAlgorithm<K extends Comparable<K> & Serializable>
*/
public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
- public void updateVertex(K vertexKey, Long vertexValue,
+ public void updateVertex(Vertex<K, Long> vertex,
MessageIterator<Long> inMessages) {
Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
long maxFrequency = 1;
- long mostFrequentLabel = vertexValue;
+ long mostFrequentLabel = vertex.getValue();
// store the labels with their frequencies
for (Long msg : inMessages) {
@@ -107,8 +108,8 @@ public class LabelPropagationAlgorithm<K extends Comparable<K> & Serializable>
*/
public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
- public void sendMessages(K vertexKey, Long newLabel) {
- sendMessageToAllNeighbors(newLabel);
+ public void sendMessages(Vertex<K, Long> vertex) {
+ sendMessageToAllNeighbors(vertex.getValue());
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
index 4803b44..f63fb0d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.library;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -63,8 +64,7 @@ public class PageRankAlgorithm<K extends Comparable<K> & Serializable> implement
}
@Override
- public void updateVertex(K vertexKey, Double vertexValue,
- MessageIterator<Double> inMessages) {
+ public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
double rankSum = 0.0;
for (double msg : inMessages) {
rankSum += msg;
@@ -91,13 +91,14 @@ public class PageRankAlgorithm<K extends Comparable<K> & Serializable> implement
}
@Override
- public void sendMessages(K vertexId, Double newRank) {
+ public void sendMessages(Vertex<K, Double> vertex) {
if (getSuperstepNumber() == 1) {
// initialize vertex ranks
- newRank = 1.0 / numVertices;
+ vertex.setValue(new Double(1.0 / numVertices));
}
- for (Edge<K, Double> edge : getOutgoingEdges()) {
- sendMessageTo(edge.getTarget(), newRank * edge.getValue());
+
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
index 41180cd..e78ae3e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
@@ -75,7 +75,7 @@ public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serial
public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
@Override
- public void updateVertex(K vertexKey, Double vertexValue,
+ public void updateVertex(Vertex<K, Double> vertex,
MessageIterator<Double> inMessages) {
Double minDistance = Double.MAX_VALUE;
@@ -86,7 +86,7 @@ public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serial
}
}
- if (vertexValue > minDistance) {
+ if (vertex.getValue() > minDistance) {
setNewVertexValue(minDistance);
}
}
@@ -101,10 +101,10 @@ public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serial
public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
@Override
- public void sendMessages(K vertexKey, Double newDistance)
+ public void sendMessages(Vertex<K, Double> vertex)
throws Exception {
- for (Edge<K, Double> edge : getOutgoingEdges()) {
- sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
+ for (Edge<K, Double> edge : getEdges()) {
+ sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index ab60f15..724a890 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -26,7 +26,10 @@ import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
@@ -41,21 +44,50 @@ import org.apache.flink.util.Collector;
public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
private static final long serialVersionUID = 1L;
-
+
+ // --------------------------------------------------------------------------------------------
+ // Attributes that allow vertices to access their in/out degrees and the total number of vertices
+ // inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices;
+
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
+ // the vertex centric iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private EdgeDirection direction;
+
+ public EdgeDirection getDirection() {
+ return direction;
+ }
+
+ public void setDirection(EdgeDirection direction) {
+ this.direction = direction;
+ }
+
// --------------------------------------------------------------------------------------------
// Public API Methods
// --------------------------------------------------------------------------------------------
-
+
/**
* This method is invoked once per superstep for each vertex that was changed in that superstep.
* It needs to produce the messages that will be received by vertices in the next superstep.
*
- * @param vertexKey The key of the vertex that was changed.
- * @param vertexValue The value (state) of the vertex that was changed.
+ * @param vertex The vertex that was changed.
*
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
- public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+ public abstract void sendMessages(Vertex<VertexKey, VertexValue> vertex) throws Exception;
/**
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -73,30 +105,30 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
/**
- * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
+ * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
* {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
*
* @return An iterator with all outgoing edges.
*/
@SuppressWarnings("unchecked")
- public Iterable<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
+ public Iterable<Edge<VertexKey, EdgeValue>> getEdges() {
if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
}
edgesUsed = true;
this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
return this.edgeIterator;
}
-
+
/**
* Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
- * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
+ * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
*
* @param m The message to send.
*/
public void sendMessageToAllNeighbors(Message m) {
if (edgesUsed) {
- throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+ throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
}
edgesUsed = true;
@@ -216,6 +248,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
@Override
public Edge<VertexKey, EdgeValue> next() {
Edge<VertexKey, EdgeValue> next = input.next();
+ edge.setSource(next.f0);
edge.setTarget(next.f1);
edge.setValue(next.f2);
return edge;
@@ -230,4 +263,24 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
return this;
}
}
+
+ /**
+ * In order to hide the Tuple3(actualValue, inDegree, outDegree) vertex value from the user,
+ * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
+ *
+ * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+ * the regular sendMessages function.
+ *
+ * @param newVertexState
+ * @throws Exception
+ */
+ void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState)
+ throws Exception {
+ Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(newVertexState.getId(),
+ newVertexState.getValue().f0);
+ vertex.setInDegree(newVertexState.getValue().f1);
+ vertex.setOutDegree(newVertexState.getValue().f2);
+
+ sendMessages(vertex);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
index 4b353d5..549a795 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.spargel;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.IterationConfiguration;
import java.util.ArrayList;
@@ -43,6 +44,15 @@ public class VertexCentricConfiguration extends IterationConfiguration {
/** the broadcast variables for the messaging function **/
private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+ /** flag that defines whether the degrees option is set **/
+ private boolean optDegrees = false;
+
+ /** flag that defines whether the number of vertices option is set **/
+ private boolean optNumVertices = false;
+
+ /** the direction in which the messages should be sent **/
+ private EdgeDirection direction = EdgeDirection.OUT;
+
public VertexCentricConfiguration() {}
/**
@@ -85,5 +95,33 @@ public class VertexCentricConfiguration extends IterationConfiguration {
return this.bcVarsMessaging;
}
+ // ----------------------------------------------------------------------------------
+ // The direction, degrees and the total number of vertices should be optional.
+ // The user can access them by setting the direction, degrees or the numVertices options.
+ // ----------------------------------------------------------------------------------
+
+ public boolean isOptDegrees() {
+ return optDegrees;
+ }
+
+ public void setOptDegrees(boolean optDegrees) {
+ this.optDegrees = optDegrees;
+ }
+
+ public boolean isOptNumVertices() {
+ return optNumVertices;
+ }
+
+ public void setOptNumVertices(boolean optNumVertices) {
+ this.optNumVertices = optNumVertices;
+ }
+
+ public EdgeDirection getDirection() {
+ return direction;
+ }
+
+ public void setDirection(EdgeDirection direction) {
+ this.direction = direction;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 79da664..95c6090 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -22,6 +22,11 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -29,11 +34,14 @@ import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;
@@ -71,7 +79,7 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
{
private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
-
+
private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
@@ -83,7 +91,8 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
private VertexCentricConfiguration configuration;
-
+
+ private DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees;
// ----------------------------------------------------------------------------------
private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
@@ -135,69 +144,46 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
if (this.initialVertices == null) {
throw new IllegalStateException("The input data set has not been set.");
}
-
+
// prepare some type information
- TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
- final int[] zeroKeyPos = new int[] {0};
-
- final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
- this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
+ // create a graph
+ Graph<VertexKey, VertexValue, EdgeValue> graph =
+ Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
- // set up the iteration operator
- if (this.configuration != null) {
+ // check whether the numVertices option is set and, if so, compute the total number of vertices
+ // and set it within the messaging and update functions
- iteration.name(this.configuration.getName(
- "Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
- iteration.parallelism(this.configuration.getParallelism());
- iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
- // register all aggregators
- for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
- iteration.registerAggregator(entry.getKey(), entry.getValue());
+ if (this.configuration !=null && this.configuration.isOptNumVertices()) {
+ try {
+ long numberOfVertices = graph.numberOfVertices();
+ messagingFunction.setNumberOfVertices(numberOfVertices);
+ updateFunction.setNumberOfVertices(numberOfVertices);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
- else {
- // no configuration provided; set default name
- iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
- }
-
- // build the messaging function (co group)
- CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
- MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
- messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-
- // configure coGroup message function with name and broadcast variables
- messages = messages.name("Messaging");
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
- messages = messages.withBroadcastSet(e.f1, e.f0);
- }
+ if(this.configuration != null) {
+ messagingFunction.setDirection(this.configuration.getDirection());
+ } else {
+ messagingFunction.setDirection(EdgeDirection.OUT);
}
-
- VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
-
- // build the update function (co group)
- CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
- messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-
- // configure coGroup update function with name and broadcast variables
- updates = updates.name("Vertex State Updates");
- if (this.configuration != null) {
- for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
- updates = updates.withBroadcastSet(e.f1, e.f0);
- }
- }
+ // retrieve the direction in which the updates are made and in which the messages are sent
+ EdgeDirection messagingDirection = messagingFunction.getDirection();
- // let the operator know that we preserve the key field
- updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-
- return iteration.closeWith(updates, updates);
-
+ DataSet<Tuple2<VertexKey, Message>> messages = null;
+
+ // check whether the degrees option is set and, if so, compute the in and the out degrees and
+ // add them to the vertex value
+ if(this.configuration != null && this.configuration.isOptDegrees()) {
+ return createResultVerticesWithDegrees(graph, messagingDirection, messages, messageTypeInfo);
+ } else {
+ return createResultSimpleVertex(messagingDirection, messages, messageTypeInfo);
+ }
}
/**
@@ -224,47 +210,89 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
{
return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
}
-
+
+ /**
+ * Configures this vertex-centric iteration with the provided parameters.
+ *
+ * @param parameters the configuration parameters
+ */
+ public void configure(VertexCentricConfiguration parameters) {
+ this.configuration = parameters;
+ }
+
+ /**
+ * @return the configuration parameters of this vertex-centric iteration
+ */
+ public VertexCentricConfiguration getIterationConfiguration() {
+ return this.configuration;
+ }
+
// --------------------------------------------------------------------------------------------
// Wrapping UDFs
// --------------------------------------------------------------------------------------------
-
+
private static final class VertexUpdateUdf<VertexKey, VertexValue, Message>
extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
{
private static final long serialVersionUID = 1L;
- private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+ final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
- private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+ final MessageIterator<Message> messageIter = new MessageIterator<Message>();
- private transient TypeInformation<Vertex<VertexKey, VertexValue>> resultType;
+ private transient TypeInformation<Vertex<VertexKey, VV>> resultType;
private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
- TypeInformation<Vertex<VertexKey, VertexValue>> resultType)
+ TypeInformation<Vertex<VertexKey, VV>> resultType)
{
this.vertexUpdateFunction = vertexUpdateFunction;
this.resultType = resultType;
}
@Override
- public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Vertex<VertexKey, VertexValue>> vertex,
- Collector<Vertex<VertexKey, VertexValue>> out)
- throws Exception
- {
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.vertexUpdateFunction.init(getIterationRuntimeContext());
+ }
+ this.vertexUpdateFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.vertexUpdateFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Vertex<VertexKey, VV>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ private static final class VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>
+ extends VertexUpdateUdf<VertexKey, VertexValue, Message> {
+
+
+ private VertexUpdateUdfSimpleVertexValue(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, VertexValue>> resultType) {
+ super(vertexUpdateFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
+ Iterable<Vertex<VertexKey, VertexValue>> vertex,
+ Collector<Vertex<VertexKey, VertexValue>> out) throws Exception {
final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator();
-
+
if (vertexIter.hasNext()) {
Vertex<VertexKey, VertexValue> vertexState = vertexIter.next();
-
+
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
-
+
vertexUpdateFunction.setOutput(vertexState, out);
- vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
+ vertexUpdateFunction.updateVertex(vertexState, messageIter);
}
else {
final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
@@ -280,23 +308,45 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
}
}
-
- @Override
- public void open(Configuration parameters) throws Exception {
- if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
- this.vertexUpdateFunction.init(getIterationRuntimeContext());
- }
- this.vertexUpdateFunction.preSuperstep();
- }
-
- @Override
- public void close() throws Exception {
- this.vertexUpdateFunction.postSuperstep();
+ }
+
+ private static final class VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message> extends VertexUpdateUdf<VertexKey,
+ Tuple3<VertexValue, Long, Long>, VertexValue, Message> {
+
+
+ private VertexUpdateUdfVertexValueWithDegrees(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> resultType) {
+ super(vertexUpdateFunction, resultType);
}
@Override
- public TypeInformation<Vertex<VertexKey, VertexValue>> getProducedType() {
- return this.resultType;
+ public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
+ Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertex,
+ Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
+ final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexIter = vertex.iterator();
+
+ if (vertexIter.hasNext()) {
+ Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState = vertexIter.next();
+
+ @SuppressWarnings("unchecked")
+ Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+ messageIter.setSource(downcastIter);
+
+ vertexUpdateFunction.setOutputWithDegrees(vertexState, out);
+ vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexState, messageIter);
+ }
+ else {
+ final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+ if (messageIter.hasNext()) {
+ String message = "Target vertex does not exist!.";
+ try {
+ Tuple2<VertexKey, Message> next = messageIter.next();
+ message = "Target vertex '" + next.f0 + "' does not exist!.";
+ } catch (Throwable t) {}
+ throw new Exception(message);
+ } else {
+ throw new Exception();
+ }
+ }
}
}
@@ -309,31 +359,17 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
{
private static final long serialVersionUID = 1L;
- private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+ final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-
-
+
+
private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
TypeInformation<Tuple2<VertexKey, Message>> resultType)
{
this.messagingFunction = messagingFunction;
this.resultType = resultType;
}
-
- @Override
- public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
- Iterable<Vertex<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
- throws Exception
- {
- final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
-
- if (stateIter.hasNext()) {
- Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
- messagingFunction.set((Iterator<?>) edges.iterator(), out);
- messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
- }
- }
@Override
public void open(Configuration parameters) throws Exception {
@@ -355,19 +391,301 @@ public class VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>
}
}
+ private static final class MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>
+ extends MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> {
+
+ private MessagingUdfWithEdgeValuesSimpleVertexValue(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+ TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+ super(messagingFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
+ Iterable<Vertex<VertexKey, VertexValue>> state,
+ Collector<Tuple2<VertexKey, Message>> out) throws Exception {
+ final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
+ messagingFunction.set((Iterator<?>) edges.iterator(), out);
+ messagingFunction.sendMessages(newVertexState);
+ }
+ }
+ }
+
+ private static final class MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>
+ extends MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> {
+
+
+ private MessagingUdfWithEdgeValuesVertexValueWithDegrees
+ (MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+ TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+ super(messagingFunction, resultType);
+ }
+
+ @Override
+ public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
+ Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> state,
+ Collector<Tuple2<VertexKey, Message>> out) throws Exception {
+
+ final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> stateIter = state.iterator();
+
+ if (stateIter.hasNext()) {
+ Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState = stateIter.next();
+ messagingFunction.set((Iterator<?>) edges.iterator(), out);
+ messagingFunction.sendMessagesFromVertexCentricIteration(newVertexState);
+ }
+ }
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // UTIL methods
+ // --------------------------------------------------------------------------------------------
+
/**
- * Configures this vertex-centric iteration with the provided parameters.
+ * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+ * degrees).
+ * It afterwards configures the function with a custom name and broadcast variables.
*
- * @param parameters the configuration parameters
+ * @param iteration
+ * @param messageTypeInfo
+ * @param whereArg the argument for the where within the coGroup
+ * @param equalToArg the argument for the equalTo within the coGroup
+ * @return the messaging function
*/
- public void configure(VertexCentricConfiguration parameters) {
- this.configuration = parameters;
+ private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunction(
+ DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration,
+ TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+ // build the messaging function (co group)
+ CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+ MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> messenger =
+ new MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>(
+ messagingFunction, messageTypeInfo);
+
+ messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+ .equalTo(equalToArg).with(messenger);
+
+ // configure coGroup message function with name and broadcast variables
+ messages = messages.name("Messaging");
+ if(this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ messages = messages.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ return messages;
}
/**
- * @return the configuration parameters of this vertex-centric iteration
+ * Method that builds the messaging function using a coGroup operator for a vertex
+ * containing degree information.
+ * It afterwards configures the function with a custom name and broadcast variables.
+ *
+ * @param iteration
+ * @param messageTypeInfo
+ * @param whereArg the argument for the where within the coGroup
+ * @param equalToArg the argument for the equalTo within the coGroup
+ * @return the messaging function
*/
- public VertexCentricConfiguration getIterationConfiguration() {
- return this.configuration;
+ private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunctionVerticesWithDegrees(
+ DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
+ Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration,
+ TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+ // build the messaging function (co group)
+ CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+ MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> messenger =
+ new MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>(
+ messagingFunction, messageTypeInfo);
+
+ messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+ .equalTo(equalToArg).with(messenger);
+
+ // configure coGroup message function with name and broadcast variables
+ messages = messages.name("Messaging");
+
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+ messages = messages.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ return messages;
+ }
+
+ /**
+ * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
+ *
+ * @param vertices
+ * @param <VV>
+ */
+
+ private <VV> DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> setUpIteration(
+ DataSet<Vertex<VertexKey, VV>> vertices) {
+
+ final int[] zeroKeyPos = new int[] {0};
+
+ final DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> iteration =
+ vertices.iterateDelta(vertices, this.maximumNumberOfIterations, zeroKeyPos);
+
+ // set up the iteration operator
+ if (this.configuration != null) {
+
+ iteration.name(this.configuration.getName("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+ iteration.parallelism(this.configuration.getParallelism());
+ iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+ // register all aggregators
+ for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+ iteration.registerAggregator(entry.getKey(), entry.getValue());
+ }
+ }
+ else {
+ // no configuration provided; set default name
+ iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
+ }
+
+ return iteration;
+ }
+
+ /**
+ * Creates the operator that represents this vertex centric graph computation for a simple vertex.
+ *
+ * @param messagingDirection
+ * @param messages
+ * @param messageTypeInfo
+ * @return the operator
+ */
+ private DataSet<Vertex<VertexKey, VertexValue>> createResultSimpleVertex(EdgeDirection messagingDirection,
+ DataSet<Tuple2<VertexKey, Message>> messages,
+ TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
+ TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+
+ final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
+ setUpIteration(this.initialVertices);
+
+ switch (messagingDirection) {
+ case IN:
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+ break;
+ case OUT:
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+ break;
+ case ALL:
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
+ .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+
+ VertexUpdateUdf<VertexKey, VertexValue, VertexValue, Message> updateUdf =
+ new VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+
+ // build the update function (co group)
+ CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
+ messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+ configureUpdateFunction(updates);
+
+ return iteration.closeWith(updates, updates);
+ }
+
+ /**
+ * Creates the operator that represents this vertex centric graph computation for a vertex with in
+ * and out degrees added to the vertex value.
+ *
+ * @param graph
+ * @param messagingDirection
+ * @param messages
+ * @param messageTypeInfo
+ * @return the operator
+ */
+ private DataSet<Vertex<VertexKey, VertexValue>> createResultVerticesWithDegrees(
+ Graph<VertexKey, VertexValue, EdgeValue> graph,
+ EdgeDirection messagingDirection,
+ DataSet<Tuple2<VertexKey, Message>> messages,
+ TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
+
+ this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+
+ DataSet<Tuple2<VertexKey, Long>> inDegrees = graph.inDegrees();
+ DataSet<Tuple2<VertexKey, Long>> outDegrees = graph.outDegrees();
+
+ DataSet<Tuple3<VertexKey, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Tuple2<VertexKey, Long>, Tuple2<VertexKey, Long>, Tuple3<VertexKey, Long, Long>>() {
+
+ @Override
+ public void join(Tuple2<VertexKey, Long> first, Tuple2<VertexKey, Long> second, Collector<Tuple3<VertexKey, Long, Long>> out) throws Exception {
+ out.collect(new Tuple3<VertexKey, Long, Long>(first.f0, first.f1, second.f1));
+ }
+ });
+
+ DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees= initialVertices
+ .join(degrees).where(0).equalTo(0)
+ .with(new FlatJoinFunction<Vertex<VertexKey,VertexValue>, Tuple3<VertexKey,Long,Long>, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>>() {
+ @Override
+ public void join(Vertex<VertexKey, VertexValue> vertex,
+ Tuple3<VertexKey, Long, Long> degrees,
+ Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
+
+ out.collect(new Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
+ new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+ }
+ });
+
+ // add type info
+ TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+
+ final DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
+ Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration =
+ setUpIteration(verticesWithDegrees);
+
+ switch (messagingDirection) {
+ case IN:
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+ break;
+ case OUT:
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+ break;
+ case ALL:
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
+ .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+
+ VertexUpdateUdf<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message> updateUdf =
+ new VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+
+ // build the update function (co group)
+ CoGroupOperator<?, ?, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> updates =
+ messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+ configureUpdateFunction(updates);
+
+ return iteration.closeWith(updates, updates).map(new MapFunction<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>, Vertex<VertexKey, VertexValue>>() {
+ @Override
+ public Vertex<VertexKey, VertexValue> map(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertex) throws Exception {
+ return new Vertex<VertexKey, VertexValue>(vertex.getId(), vertex.getValue().f0);
+ }
+ });
+ }
+
+ private <VV> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<VertexKey, VV>> updates) {
+
+ // configure coGroup update function with name and broadcast variables
+ updates = updates.name("Vertex State Updates");
+ if (this.configuration != null) {
+ for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+ updates = updates.withBroadcastSet(e.f1, e.f0);
+ }
+ }
+
+ // let the operator know that we preserve the key field
+ updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 9122053..97a678c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
@@ -39,7 +40,34 @@ import org.apache.flink.util.Collector;
public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
private static final long serialVersionUID = 1L;
-
+
+ // --------------------------------------------------------------------------------------------
+ // Attributes that allow vertices to access their in/out degrees and the total number of vertices
+ // inside an iteration.
+ // --------------------------------------------------------------------------------------------
+
+ private long numberOfVertices;
+
+ public long getNumberOfVertices() {
+ return numberOfVertices;
+ }
+
+ void setNumberOfVertices(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
+ }
+
+ //---------------------------------------------------------------------------------------------
+
+ private boolean optDegrees;
+
+ public boolean isOptDegrees() {
+ return optDegrees;
+ }
+
+ void setOptDegrees(boolean optDegrees) {
+ this.optDegrees = optDegrees;
+ }
+
// --------------------------------------------------------------------------------------------
// Public API Methods
// --------------------------------------------------------------------------------------------
@@ -49,13 +77,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
* the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
*
- * @param vertexKey The key (identifier) of the vertex.
- * @param vertexValue The value (state) of the vertex.
+ * @param vertex The vertex.
* @param inMessages The incoming messages to this vertex.
*
* @throws Exception The computation may throw exceptions, which causes the superstep to fail.
*/
- public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+ public abstract void updateVertex(Vertex<VertexKey, VertexValue> vertex, MessageIterator<Message> inMessages) throws Exception;
/**
* This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -77,8 +104,13 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
* @param newValue The new vertex value.
*/
public void setNewVertexValue(VertexValue newValue) {
- outVal.f1 = newValue;
- out.collect(outVal);
+ if(isOptDegrees()) {
+ outValWithDegrees.f1.f0 = newValue;
+ outWithDegrees.collect(outValWithDegrees);
+ } else {
+ outVal.setValue(newValue);
+ out.collect(outVal);
+ }
}
/**
@@ -128,18 +160,51 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
// --------------------------------------------------------------------------------------------
private IterationRuntimeContext runtimeContext;
-
+
private Collector<Vertex<VertexKey, VertexValue>> out;
+ private Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees;
+
private Vertex<VertexKey, VertexValue> outVal;
-
-
+
+ private Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees;
+
+
void init(IterationRuntimeContext context) {
this.runtimeContext = context;
}
-
- void setOutput(Vertex<VertexKey, VertexValue> val, Collector<Vertex<VertexKey, VertexValue>> out) {
+
+
+
+ void setOutputWithDegrees(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees,
+ Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees) {
+ this.outValWithDegrees = outValWithDegrees;
+ this.outWithDegrees = outWithDegrees;
+ }
+
+ void setOutput(Vertex<VertexKey, VertexValue> outVal, Collector<Vertex<VertexKey, VertexValue>> out) {
+ this.outVal = outVal;
this.out = out;
- this.outVal = val;
+ }
+
+ /**
+ * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
+ * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
+ *
+ * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+ * the regular updateVertex function.
+ *
+ * @param vertexState
+ * @param inMessages
+ * @throws Exception
+ */
+ void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState,
+ MessageIterator<Message> inMessages) throws Exception {
+ Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(vertexState.getId(),
+ vertexState.getValue().f0);
+ vertex.setInDegree(vertexState.getValue().f1);
+ vertex.setOutDegree(vertexState.getValue().f2);
+
+ updateVertex(vertex, inMessages);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index d84952a..e93f581 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
@@ -58,20 +59,20 @@ public class CollectionModeSuperstepITCase {
public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
@Override
- public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long superstep = getSuperstepNumber();
- Assert.assertEquals(true, vertexValue == superstep);
- setNewVertexValue(vertexValue + 1);
+ Assert.assertEquals(true, vertex.getValue() == superstep);
+ setNewVertexValue(vertex.getValue() + 1);
}
}
public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
@Override
- public void sendMessages(Long vertexId, Long vertexValue) {
+ public void sendMessages(Vertex<Long, Long> vertex) {
long superstep = getSuperstepNumber();
- Assert.assertEquals(true, vertexValue == superstep);
+ Assert.assertEquals(true, vertex.getValue() == superstep);
//send message to keep vertices active
- sendMessageToAllNeighbors(vertexValue);
+ sendMessageToAllNeighbors(vertex.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 4e8412c..35b8233 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -150,10 +150,10 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Override
- public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
long superstep = getSuperstepNumber();
aggregator.aggregate(superstep);
- setNewVertexValue(vertexValue + 1);
+ setNewVertexValue(vertex.getValue() + 1);
}
}
@@ -178,9 +178,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
}
@Override
- public void sendMessages(Long vertexId, Long vertexValue) {
+ public void sendMessages(Vertex<Long, Long> vertex) {
//send message to keep vertices active
- sendMessageToAllNeighbors(vertexValue);
+ sendMessageToAllNeighbors(vertex.getValue());
}
}
@@ -188,8 +188,8 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
@Override
- public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
- setNewVertexValue(vertexValue + 1);
+ public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+ setNewVertexValue(vertex.getValue() + 1);
}
}
@@ -197,9 +197,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
@Override
- public void sendMessages(Long vertexId, Long vertexValue) {
+ public void sendMessages(Vertex<Long, Long> vertex) {
//send message to keep vertices active
- sendMessageToAllNeighbors(vertexValue);
+ sendMessageToAllNeighbors(vertex.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
index f2f3d8c..17a4cb0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -66,7 +66,6 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
result.writeAsCsv(resultPath, "\n", " ");
env.execute();
}
-
/**
* A map function that takes a Long value and creates a 2-tuple out of it:
* <pre>(Long value) -> (value, value)</pre>
http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
new file mode 100644
index 0000000..58738d2
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.IncrementalSSSPExample;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+ private String verticesPath;
+
+ private String edgesPath;
+
+ private String edgesInSSSPPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public IncrementalSSSPITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ File verticesFile = tempFolder.newFile();
+ Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8);
+
+ File edgesInSSSPFile = tempFolder.newFile();
+ Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+ edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+ }
+
+ @Test
+ public void testIncrementalSSSPExample() throws Exception {
+ IncrementalSSSPExample.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
+ IncrementalSSSPData.EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
+ expected = IncrementalSSSPData.RESULTED_VERTICES;
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+}