You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 17:30:13 UTC

[2/2] flink git commit: [FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank

[FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank

This closes #462


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1daadcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1daadcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1daadcb

Branch: refs/heads/master
Commit: a1daadcba168e7963c8e532b84f7d9613f86b00b
Parents: 9db170f
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 9 11:41:58 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 18:15:38 2015 +0200

----------------------------------------------------------------------
 docs/gelly_guide.md                             |  6 +--
 .../main/java/org/apache/flink/graph/Graph.java |  4 +-
 .../org/apache/flink/graph/GraphAlgorithm.java  |  2 +-
 .../flink/graph/example/GraphMetrics.java       | 22 ++++------
 .../apache/flink/graph/library/PageRank.java    | 42 +++++---------------
 .../flink/graph/validation/GraphValidator.java  |  2 +-
 .../validation/InvalidVertexIdsValidator.java   | 10 +----
 7 files changed, 26 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 30e8315..8058e0a 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -160,10 +160,10 @@ DataSet<Tuple2<K, Long>> outDegrees()
 DataSet<Tuple2<K, Long>> getDegrees()
 
 // get the number of vertices
-DataSet<Integer> numberOfVertices()
+long numberOfVertices()
 
 // get the number of edges
-DataSet<Integer> numberOfEdges()
+long numberOfEdges()
 
 {% endhighlight %}
 
@@ -386,7 +386,7 @@ List<Vertex<Long, Long>> vertices = ...
 // create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
 List<Edge<Long, Long>> edges = ...
 
-Graph<Long, Long, Long> graph = Graph.fromcollection(vertices, edges, env);
+Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
 
 // will return false: 6 is an invalid ID
 graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 91620cc..334d5d3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -926,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 *            the maximum number of iterations for the inner delta iteration
 	 * @return true if the graph is weakly connected.
 	 */
-	public Boolean isWeaklyConnected(int maxIterations) throws Exception {
+	public boolean isWeaklyConnected(int maxIterations) throws Exception {
 		// first, convert to an undirected graph
 		Graph<K, VV, EV> graph = this.getUndirected();
 
@@ -1145,7 +1145,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
 
-	public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) {
+	public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
 		return algorithm.run(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index f5e7018..ceeeaf4 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -27,5 +27,5 @@ import java.io.Serializable;
  */
 public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {
 
-	public Graph<K, VV, EV> run(Graph<K, VV, EV> input);
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 1977255..c6a776d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -18,16 +18,12 @@
 
 package org.apache.flink.graph.example;
 
-import java.util.Collection;
-
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
@@ -72,8 +68,7 @@ public class GraphMetrics implements ProgramDescription {
 		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
 
 		DataSet<Double> avgNodeDegree = verticesWithDegrees
-				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
-				.withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
+				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
 		
 		/** find the vertex with the maximum in-degree **/
 		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -100,17 +95,14 @@ public class GraphMetrics implements ProgramDescription {
 	}
 
 	@SuppressWarnings("serial")
-	private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long, Long>, Double> {
+	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
 
-		private int numberOfVertices;
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			Collection<Integer> bCastSet = getRuntimeContext()
-					.getBroadcastVariable("numberOfVertices");
-			numberOfVertices = bCastSet.iterator().next();
+		private long numberOfVertices;
+
+		public AvgNodeDegreeMapper(long numberOfVertices) {
+			this.numberOfVertices = numberOfVertices;
 		}
-		
+
 		public Double map(Tuple2<Long, Long> sumTuple) {
 			return (double) (sumTuple.f1 / numberOfVertices) ;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 3e6610e..00ae204 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,11 +20,6 @@ package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
-<<<<<<< HEAD
-import org.apache.flink.api.java.DataSet;
-=======
-import org.apache.flink.api.java.ExecutionEnvironment;
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -45,25 +40,13 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	}
 
 	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
 
-		DataSet<Integer> numberOfVertices = network.numberOfVertices();
+		final long numberOfVertices = network.numberOfVertices();
 
 		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
-				new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
-<<<<<<< HEAD
-
-		iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
-		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
-
-=======
-		try {
-			iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
-					ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
+				new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+				maxIterations);
 		return network.runVertexCentricIteration(iteration);
 	}
 
@@ -76,15 +59,11 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 			extends VertexUpdateFunction<K, Double, Double> {
 
 		private final double beta;
-		private int numVertices;
+		private final long numVertices;
 		
-		public VertexRankUpdater(double beta) {
+		public VertexRankUpdater(double beta, long numberOfVertices) {
 			this.beta = beta;
-		}
-		
-		@Override
-		public void preSuperstep(){
-			numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+			this.numVertices = numberOfVertices;
 		}
 
 		@Override
@@ -110,11 +89,10 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	public static final class RankMessenger<K extends Comparable<K> & Serializable>
 			extends MessagingFunction<K, Double, Double, Double> {
 
-		private int numVertices;
+		private final long numVertices;
 
-		@Override
-		public void preSuperstep(){
-			numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+		public RankMessenger(long numberOfVertices) {
+			this.numVertices = numberOfVertices;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 339e38d..101e82c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -33,6 +33,6 @@ import org.apache.flink.graph.Graph;
 public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
 		implements Serializable {
 
-	public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
+	public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index aeca482..cc06ca7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -38,11 +38,11 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
 	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
 	 * also exist in the vertex input set.
 	 * 
-	 * @return a Boolean stating whether a graph is valid
+	 * @return a boolean stating whether a graph is valid
 	 *         with respect to its vertex ids.
 	 */
 	@Override
-	public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
+	public boolean validate(Graph<K, VV, EV> graph) throws Exception {
 		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
 				.flatMap(new MapEdgeIds<K, EV>()).distinct();
 		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
@@ -76,10 +76,4 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
 		}
 	}
 
-	private static final class InvalidIdsMap implements	MapFunction<Integer, Boolean> {
-		public Boolean map(Integer numberOfInvalidIds) throws Exception {
-			return numberOfInvalidIds == 0;
-		}
-	}
-
 }
\ No newline at end of file