You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 17:30:13 UTC
[2/2] flink git commit: [FLINK-1632][gelly] Removed bcast var in
GraphMetrics and PageRank
[FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank
This closes #462
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1daadcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1daadcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1daadcb
Branch: refs/heads/master
Commit: a1daadcba168e7963c8e532b84f7d9613f86b00b
Parents: 9db170f
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 9 11:41:58 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 18:15:38 2015 +0200
----------------------------------------------------------------------
docs/gelly_guide.md | 6 +--
.../main/java/org/apache/flink/graph/Graph.java | 4 +-
.../org/apache/flink/graph/GraphAlgorithm.java | 2 +-
.../flink/graph/example/GraphMetrics.java | 22 ++++------
.../apache/flink/graph/library/PageRank.java | 42 +++++---------------
.../flink/graph/validation/GraphValidator.java | 2 +-
.../validation/InvalidVertexIdsValidator.java | 10 +----
7 files changed, 26 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 30e8315..8058e0a 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -160,10 +160,10 @@ DataSet<Tuple2<K, Long>> outDegrees()
DataSet<Tuple2<K, Long>> getDegrees()
// get the number of vertices
-DataSet<Integer> numberOfVertices()
+long numberOfVertices()
// get the number of edges
-DataSet<Integer> numberOfEdges()
+long numberOfEdges()
{% endhighlight %}
@@ -386,7 +386,7 @@ List<Vertex<Long, Long>> vertices = ...
// create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
List<Edge<Long, Long>> edges = ...
-Graph<Long, Long, Long> graph = Graph.fromcollection(vertices, edges, env);
+Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
// will return false: 6 is an invalid ID
graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 91620cc..334d5d3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -926,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* the maximum number of iterations for the inner delta iteration
* @return true if the graph is weakly connected.
*/
- public Boolean isWeaklyConnected(int maxIterations) throws Exception {
+ public boolean isWeaklyConnected(int maxIterations) throws Exception {
// first, convert to an undirected graph
Graph<K, VV, EV> graph = this.getUndirected();
@@ -1145,7 +1145,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
- public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) {
+ public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
return algorithm.run(this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index f5e7018..ceeeaf4 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -27,5 +27,5 @@ import java.io.Serializable;
*/
public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {
- public Graph<K, VV, EV> run(Graph<K, VV, EV> input);
+ public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 1977255..c6a776d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -18,16 +18,12 @@
package org.apache.flink.graph.example;
-import java.util.Collection;
-
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.example.utils.ExampleUtils;
@@ -72,8 +68,7 @@ public class GraphMetrics implements ProgramDescription {
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
DataSet<Double> avgNodeDegree = verticesWithDegrees
- .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
- .withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
+ .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
/** find the vertex with the maximum in-degree **/
DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -100,17 +95,14 @@ public class GraphMetrics implements ProgramDescription {
}
@SuppressWarnings("serial")
- private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long, Long>, Double> {
+ private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
- private int numberOfVertices;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- Collection<Integer> bCastSet = getRuntimeContext()
- .getBroadcastVariable("numberOfVertices");
- numberOfVertices = bCastSet.iterator().next();
+ private long numberOfVertices;
+
+ public AvgNodeDegreeMapper(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
}
-
+
public Double map(Tuple2<Long, Long> sumTuple) {
return (double) (sumTuple.f1 / numberOfVertices) ;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 3e6610e..00ae204 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,11 +20,6 @@ package org.apache.flink.graph.library;
import java.io.Serializable;
-<<<<<<< HEAD
-import org.apache.flink.api.java.DataSet;
-=======
-import org.apache.flink.api.java.ExecutionEnvironment;
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
@@ -45,25 +40,13 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
}
@Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+ public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
- DataSet<Integer> numberOfVertices = network.numberOfVertices();
+ final long numberOfVertices = network.numberOfVertices();
VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
- new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
-<<<<<<< HEAD
-
- iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
- iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
-
-=======
- try {
- iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
- ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
- } catch (Exception e) {
- e.printStackTrace();
- }
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
+ new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+ maxIterations);
return network.runVertexCentricIteration(iteration);
}
@@ -76,15 +59,11 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
extends VertexUpdateFunction<K, Double, Double> {
private final double beta;
- private int numVertices;
+ private final long numVertices;
- public VertexRankUpdater(double beta) {
+ public VertexRankUpdater(double beta, long numberOfVertices) {
this.beta = beta;
- }
-
- @Override
- public void preSuperstep(){
- numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+ this.numVertices = numberOfVertices;
}
@Override
@@ -110,11 +89,10 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
public static final class RankMessenger<K extends Comparable<K> & Serializable>
extends MessagingFunction<K, Double, Double, Double> {
- private int numVertices;
+ private final long numVertices;
- @Override
- public void preSuperstep(){
- numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+ public RankMessenger(long numberOfVertices) {
+ this.numVertices = numberOfVertices;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 339e38d..101e82c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -33,6 +33,6 @@ import org.apache.flink.graph.Graph;
public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements Serializable {
- public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
+ public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index aeca482..cc06ca7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -38,11 +38,11 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
* Checks that the edge set input contains valid vertex Ids, i.e. that they
* also exist in the vertex input set.
*
- * @return a Boolean stating whether a graph is valid
+ * @return a boolean stating whether a graph is valid
* with respect to its vertex ids.
*/
@Override
- public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
+ public boolean validate(Graph<K, VV, EV> graph) throws Exception {
DataSet<Tuple1<K>> edgeIds = graph.getEdges()
.flatMap(new MapEdgeIds<K, EV>()).distinct();
DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
@@ -76,10 +76,4 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
}
}
- private static final class InvalidIdsMap implements MapFunction<Integer, Boolean> {
- public Boolean map(Integer numberOfInvalidIds) throws Exception {
- return numberOfInvalidIds == 0;
- }
- }
-
}
\ No newline at end of file