You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 17:30:12 UTC

[1/2] flink git commit: [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()

Repository: flink
Updated Branches:
  refs/heads/master 9077a53bf -> a1daadcba


[FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9db170fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9db170fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9db170fc

Branch: refs/heads/master
Commit: 9db170fc31af7a7ddfe6e77d55006efbefd91800
Parents: 9077a53
Author: andralungu <lu...@gmail.com>
Authored: Sat Mar 7 17:17:28 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 18:04:48 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 28 ++++------
 .../flink/graph/example/GraphMetrics.java       | 10 ++--
 .../apache/flink/graph/library/PageRank.java    | 13 +++++
 .../apache/flink/graph/utils/GraphUtils.java    | 57 --------------------
 .../flink/graph/validation/GraphValidator.java  |  3 +-
 .../validation/InvalidVertexIdsValidator.java   |  8 ++-
 .../flink/graph/test/WeaklyConnectedITCase.java |  8 +--
 .../test/operations/GraphCreationITCase.java    |  8 +--
 .../test/operations/GraphOperationsITCase.java  |  4 +-
 9 files changed, 41 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f7c21d0..91620cc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -50,7 +50,6 @@ import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -293,7 +292,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * 
 	 * @return true if the Graph is valid.
 	 */
-	public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
+	public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
 		return validator.validate(this);
 	}
 
@@ -877,17 +876,17 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
-	 * @return Singleton DataSet containing the vertex count
+	 * @return a long integer representing the number of vertices
 	 */
-	public DataSet<Integer> numberOfVertices() {
-		return GraphUtils.count(vertices, context);
+	public long numberOfVertices() throws Exception {
+		return vertices.count();
 	}
 
 	/**
-	 * @return Singleton DataSet containing the edge count
+	 * @return a long integer representing the number of edges
 	 */
-	public DataSet<Integer> numberOfEdges() {
-		return GraphUtils.count(edges, context);
+	public long numberOfEdges() throws Exception {
+		return edges.count();
 	}
 
 	/**
@@ -927,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 *            the maximum number of iterations for the inner delta iteration
 	 * @return true if the graph is weakly connected.
 	 */
-	public DataSet<Boolean> isWeaklyConnected(int maxIterations) {
+	public Boolean isWeaklyConnected(int maxIterations) throws Exception {
 		// first, convert to an undirected graph
 		Graph<K, VV, EV> graph = this.getUndirected();
 
@@ -948,9 +947,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 				.with(new VertexWithNewComponentJoin<K>());
 
 		DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);
-		DataSet<Boolean> result = GraphUtils.count(components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()),
-				context).map(new CheckIfOneComponentMapper());
-		return result;
+		return components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()).count() == 1;
 	}
 
 	private static final class DuplicateVertexIDMapper<K> implements MapFunction<K, Tuple2<K, K>> {
@@ -983,13 +980,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class CheckIfOneComponentMapper implements	MapFunction<Integer, Boolean> {
-		@Override
-		public Boolean map(Integer n) {
-			return (n == 1);
-		}
-	}
-
 	/**
 	 * Adds the input vertex and edges to the graph. If the vertex already
 	 * exists in the graph, it will not be added again, but the given edges

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index a5ddf2a..1977255 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -63,17 +63,17 @@ public class GraphMetrics implements ProgramDescription {
 		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
 		
 		/** get the number of vertices **/
-		DataSet<Integer> numVertices = graph.numberOfVertices();
+		long numVertices = graph.numberOfVertices();
 		
 		/** get the number of edges **/
-		DataSet<Integer> numEdges = graph.numberOfEdges();
+		long numEdges = graph.numberOfEdges();
 		
 		/** compute the average node degree **/
 		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
 
 		DataSet<Double> avgNodeDegree = verticesWithDegrees
 				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
-				.withBroadcastSet(numVertices, "numberOfVertices");
+				.withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
 		
 		/** find the vertex with the maximum in-degree **/
 		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -88,8 +88,8 @@ public class GraphMetrics implements ProgramDescription {
 		DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
 		
 		/** print the results **/
-		ExampleUtils.printResult(numVertices, "Total number of vertices");
-		ExampleUtils.printResult(numEdges, "Total number of edges");
+		ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
+		ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
 		ExampleUtils.printResult(avgNodeDegree, "Average node degree");
 		ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
 		ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index e06e64f..3e6610e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,7 +20,11 @@ package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
+<<<<<<< HEAD
 import org.apache.flink.api.java.DataSet;
+=======
+import org.apache.flink.api.java.ExecutionEnvironment;
+>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -47,10 +51,19 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 
 		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
 				new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
+<<<<<<< HEAD
 
 		iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
 		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
 
+=======
+		try {
+			iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
+					ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
 		return network.runVertexCentricIteration(iteration);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
deleted file mode 100644
index aba1c14..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-
-@SuppressWarnings("serial")
-public class GraphUtils {
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public static DataSet<Integer> count(DataSet set, ExecutionEnvironment env) {
-		List<Integer> list = new ArrayList<Integer>();
-		list.add(0);
-		DataSet<Integer> initialCount = env.fromCollection(list);
-		return set.map(new OneMapper()).union(initialCount)
-				.reduce(new AddOnesReducer()).first(1);
-	}
-
-	private static final class OneMapper<T extends Tuple> implements
-			MapFunction<T, Integer> {
-		@Override
-		public Integer map(T o) throws Exception {
-			return 1;
-		}
-	}
-
-	private static final class AddOnesReducer implements
-			ReduceFunction<Integer> {
-		@Override
-		public Integer reduce(Integer one, Integer two) throws Exception {
-			return one + two;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 04883be..339e38d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.validation;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 
 /**
@@ -34,6 +33,6 @@ import org.apache.flink.graph.Graph;
 public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
 		implements Serializable {
 
-	public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
+	public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index b043f3c..aeca482 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
@@ -39,18 +38,17 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
 	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
 	 * also exist in the vertex input set.
 	 * 
-	 * @return a singleton DataSet<Boolean> stating whether a graph is valid
+	 * @return a Boolean stating whether a graph is valid
 	 *         with respect to its vertex ids.
 	 */
 	@Override
-	public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
+	public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
 		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
 				.flatMap(new MapEdgeIds<K, EV>()).distinct();
 		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
 				.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
 
-		return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()),
-				graph.getContext()).map(new InvalidIdsMap());
+		return invalidIds.map(new KToTupleMap<K>()).count() == 0;
 	}
 
 	private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable>

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
index 1628952..9db449e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
@@ -62,7 +62,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "true\n";
@@ -78,7 +78,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "false\n";
@@ -94,7 +94,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "true\n";
@@ -110,7 +110,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
 		
-		graph.isWeaklyConnected(10).writeAsText(resultPath);
+		env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
 		
 		env.execute();
 		expectedResult = "false\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index dc0e5d2..3fe69fd 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -123,9 +123,9 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+		Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
 
-		result.writeAsText(resultPath);
+		env.fromElements(result).writeAsText(resultPath);
 		env.execute();
 
 		expectedResult = "true\n";
@@ -141,8 +141,8 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
-		DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
-		result.writeAsText(resultPath);
+		Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+		env.fromElements(result).writeAsText(resultPath);
 		env.execute();
 
 		expectedResult = "false\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index e7f067a..6210f43 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -180,7 +180,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.numberOfVertices().writeAsText(resultPath);
+		env.fromElements(graph.numberOfVertices()).writeAsText(resultPath);
 
 		env.execute();
 		expectedResult = "5";
@@ -195,7 +195,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph.numberOfEdges().writeAsText(resultPath);
+		env.fromElements(graph.numberOfEdges()).writeAsText(resultPath);
 
 		env.execute();
 		expectedResult = "7";


[2/2] flink git commit: [FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank

Posted by va...@apache.org.
[FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank

This closes #462


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1daadcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1daadcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1daadcb

Branch: refs/heads/master
Commit: a1daadcba168e7963c8e532b84f7d9613f86b00b
Parents: 9db170f
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 9 11:41:58 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 18:15:38 2015 +0200

----------------------------------------------------------------------
 docs/gelly_guide.md                             |  6 +--
 .../main/java/org/apache/flink/graph/Graph.java |  4 +-
 .../org/apache/flink/graph/GraphAlgorithm.java  |  2 +-
 .../flink/graph/example/GraphMetrics.java       | 22 ++++------
 .../apache/flink/graph/library/PageRank.java    | 42 +++++---------------
 .../flink/graph/validation/GraphValidator.java  |  2 +-
 .../validation/InvalidVertexIdsValidator.java   | 10 +----
 7 files changed, 26 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 30e8315..8058e0a 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -160,10 +160,10 @@ DataSet<Tuple2<K, Long>> outDegrees()
 DataSet<Tuple2<K, Long>> getDegrees()
 
 // get the number of vertices
-DataSet<Integer> numberOfVertices()
+long numberOfVertices()
 
 // get the number of edges
-DataSet<Integer> numberOfEdges()
+long numberOfEdges()
 
 {% endhighlight %}
 
@@ -386,7 +386,7 @@ List<Vertex<Long, Long>> vertices = ...
 // create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
 List<Edge<Long, Long>> edges = ...
 
-Graph<Long, Long, Long> graph = Graph.fromcollection(vertices, edges, env);
+Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
 
 // will return false: 6 is an invalid ID
 graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 91620cc..334d5d3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -926,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 *            the maximum number of iterations for the inner delta iteration
 	 * @return true if the graph is weakly connected.
 	 */
-	public Boolean isWeaklyConnected(int maxIterations) throws Exception {
+	public boolean isWeaklyConnected(int maxIterations) throws Exception {
 		// first, convert to an undirected graph
 		Graph<K, VV, EV> graph = this.getUndirected();
 
@@ -1145,7 +1145,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
 
-	public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) {
+	public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
 		return algorithm.run(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index f5e7018..ceeeaf4 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -27,5 +27,5 @@ import java.io.Serializable;
  */
 public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {
 
-	public Graph<K, VV, EV> run(Graph<K, VV, EV> input);
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 1977255..c6a776d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -18,16 +18,12 @@
 
 package org.apache.flink.graph.example;
 
-import java.util.Collection;
-
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
@@ -72,8 +68,7 @@ public class GraphMetrics implements ProgramDescription {
 		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
 
 		DataSet<Double> avgNodeDegree = verticesWithDegrees
-				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
-				.withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
+				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
 		
 		/** find the vertex with the maximum in-degree **/
 		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -100,17 +95,14 @@ public class GraphMetrics implements ProgramDescription {
 	}
 
 	@SuppressWarnings("serial")
-	private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long, Long>, Double> {
+	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
 
-		private int numberOfVertices;
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			Collection<Integer> bCastSet = getRuntimeContext()
-					.getBroadcastVariable("numberOfVertices");
-			numberOfVertices = bCastSet.iterator().next();
+		private long numberOfVertices;
+
+		public AvgNodeDegreeMapper(long numberOfVertices) {
+			this.numberOfVertices = numberOfVertices;
 		}
-		
+
 		public Double map(Tuple2<Long, Long> sumTuple) {
 			return (double) (sumTuple.f1 / numberOfVertices) ;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 3e6610e..00ae204 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,11 +20,6 @@ package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
-<<<<<<< HEAD
-import org.apache.flink.api.java.DataSet;
-=======
-import org.apache.flink.api.java.ExecutionEnvironment;
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -45,25 +40,13 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	}
 
 	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
 
-		DataSet<Integer> numberOfVertices = network.numberOfVertices();
+		final long numberOfVertices = network.numberOfVertices();
 
 		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
-				new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
-<<<<<<< HEAD
-
-		iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
-		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
-
-=======
-		try {
-			iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
-					ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
+				new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+				maxIterations);
 		return network.runVertexCentricIteration(iteration);
 	}
 
@@ -76,15 +59,11 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 			extends VertexUpdateFunction<K, Double, Double> {
 
 		private final double beta;
-		private int numVertices;
+		private final long numVertices;
 		
-		public VertexRankUpdater(double beta) {
+		public VertexRankUpdater(double beta, long numberOfVertices) {
 			this.beta = beta;
-		}
-		
-		@Override
-		public void preSuperstep(){
-			numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+			this.numVertices = numberOfVertices;
 		}
 
 		@Override
@@ -110,11 +89,10 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	public static final class RankMessenger<K extends Comparable<K> & Serializable>
 			extends MessagingFunction<K, Double, Double, Double> {
 
-		private int numVertices;
+		private final long numVertices;
 
-		@Override
-		public void preSuperstep(){
-			numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+		public RankMessenger(long numberOfVertices) {
+			this.numVertices = numberOfVertices;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 339e38d..101e82c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -33,6 +33,6 @@ import org.apache.flink.graph.Graph;
 public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
 		implements Serializable {
 
-	public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
+	public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index aeca482..cc06ca7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -38,11 +38,11 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
 	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
 	 * also exist in the vertex input set.
 	 * 
-	 * @return a Boolean stating whether a graph is valid
+	 * @return a boolean stating whether a graph is valid
 	 *         with respect to its vertex ids.
 	 */
 	@Override
-	public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
+	public boolean validate(Graph<K, VV, EV> graph) throws Exception {
 		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
 				.flatMap(new MapEdgeIds<K, EV>()).distinct();
 		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
@@ -76,10 +76,4 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
 		}
 	}
 
-	private static final class InvalidIdsMap implements	MapFunction<Integer, Boolean> {
-		public Boolean map(Integer numberOfInvalidIds) throws Exception {
-			return numberOfInvalidIds == 0;
-		}
-	}
-
 }
\ No newline at end of file