You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 17:30:12 UTC
[1/2] flink git commit: [FLINK-1632][gelly] Deleted GraphUtils and
made Gelly methods use DS.count()
Repository: flink
Updated Branches:
refs/heads/master 9077a53bf -> a1daadcba
[FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9db170fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9db170fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9db170fc
Branch: refs/heads/master
Commit: 9db170fc31af7a7ddfe6e77d55006efbefd91800
Parents: 9077a53
Author: andralungu <lu...@gmail.com>
Authored: Sat Mar 7 17:17:28 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 18:04:48 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Graph.java | 28 ++++------
.../flink/graph/example/GraphMetrics.java | 10 ++--
.../apache/flink/graph/library/PageRank.java | 13 +++++
.../apache/flink/graph/utils/GraphUtils.java | 57 --------------------
.../flink/graph/validation/GraphValidator.java | 3 +-
.../validation/InvalidVertexIdsValidator.java | 8 ++-
.../flink/graph/test/WeaklyConnectedITCase.java | 8 +--
.../test/operations/GraphCreationITCase.java | 8 +--
.../test/operations/GraphOperationsITCase.java | 4 +-
9 files changed, 41 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f7c21d0..91620cc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -50,7 +50,6 @@ import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -293,7 +292,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
*
* @return true if the Graph is valid.
*/
- public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
+ public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
return validator.validate(this);
}
@@ -877,17 +876,17 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
/**
- * @return Singleton DataSet containing the vertex count
+ * @return a long integer representing the number of vertices
*/
- public DataSet<Integer> numberOfVertices() {
- return GraphUtils.count(vertices, context);
+ public long numberOfVertices() throws Exception {
+ return vertices.count();
}
/**
- * @return Singleton DataSet containing the edge count
+ * @return a long integer representing the number of edges
*/
- public DataSet<Integer> numberOfEdges() {
- return GraphUtils.count(edges, context);
+ public long numberOfEdges() throws Exception {
+ return edges.count();
}
/**
@@ -927,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* the maximum number of iterations for the inner delta iteration
* @return true if the graph is weakly connected.
*/
- public DataSet<Boolean> isWeaklyConnected(int maxIterations) {
+ public Boolean isWeaklyConnected(int maxIterations) throws Exception {
// first, convert to an undirected graph
Graph<K, VV, EV> graph = this.getUndirected();
@@ -948,9 +947,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
.with(new VertexWithNewComponentJoin<K>());
DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);
- DataSet<Boolean> result = GraphUtils.count(components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()),
- context).map(new CheckIfOneComponentMapper());
- return result;
+ return components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()).count() == 1;
}
private static final class DuplicateVertexIDMapper<K> implements MapFunction<K, Tuple2<K, K>> {
@@ -983,13 +980,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
- private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> {
- @Override
- public Boolean map(Integer n) {
- return (n == 1);
- }
- }
-
/**
* Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index a5ddf2a..1977255 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -63,17 +63,17 @@ public class GraphMetrics implements ProgramDescription {
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
/** get the number of vertices **/
- DataSet<Integer> numVertices = graph.numberOfVertices();
+ long numVertices = graph.numberOfVertices();
/** get the number of edges **/
- DataSet<Integer> numEdges = graph.numberOfEdges();
+ long numEdges = graph.numberOfEdges();
/** compute the average node degree **/
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
DataSet<Double> avgNodeDegree = verticesWithDegrees
.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
- .withBroadcastSet(numVertices, "numberOfVertices");
+ .withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
/** find the vertex with the maximum in-degree **/
DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -88,8 +88,8 @@ public class GraphMetrics implements ProgramDescription {
DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
/** print the results **/
- ExampleUtils.printResult(numVertices, "Total number of vertices");
- ExampleUtils.printResult(numEdges, "Total number of edges");
+ ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
+ ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
ExampleUtils.printResult(avgNodeDegree, "Average node degree");
ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index e06e64f..3e6610e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,7 +20,11 @@ package org.apache.flink.graph.library;
import java.io.Serializable;
+<<<<<<< HEAD
import org.apache.flink.api.java.DataSet;
+=======
+import org.apache.flink.api.java.ExecutionEnvironment;
+>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
@@ -47,10 +51,19 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
+<<<<<<< HEAD
iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
+=======
+ try {
+ iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
+ ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+>>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
return network.runVertexCentricIteration(iteration);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
deleted file mode 100644
index aba1c14..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple;
-
-@SuppressWarnings("serial")
-public class GraphUtils {
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public static DataSet<Integer> count(DataSet set, ExecutionEnvironment env) {
- List<Integer> list = new ArrayList<Integer>();
- list.add(0);
- DataSet<Integer> initialCount = env.fromCollection(list);
- return set.map(new OneMapper()).union(initialCount)
- .reduce(new AddOnesReducer()).first(1);
- }
-
- private static final class OneMapper<T extends Tuple> implements
- MapFunction<T, Integer> {
- @Override
- public Integer map(T o) throws Exception {
- return 1;
- }
- }
-
- private static final class AddOnesReducer implements
- ReduceFunction<Integer> {
- @Override
- public Integer reduce(Integer one, Integer two) throws Exception {
- return one + two;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 04883be..339e38d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.validation;
import java.io.Serializable;
-import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
/**
@@ -34,6 +33,6 @@ import org.apache.flink.graph.Graph;
public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements Serializable {
- public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
+ public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index b043f3c..aeca482 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.util.Collector;
import java.io.Serializable;
@@ -39,18 +38,17 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
* Checks that the edge set input contains valid vertex Ids, i.e. that they
* also exist in the vertex input set.
*
- * @return a singleton DataSet<Boolean> stating whether a graph is valid
+ * @return a Boolean stating whether a graph is valid
* with respect to its vertex ids.
*/
@Override
- public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
+ public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
DataSet<Tuple1<K>> edgeIds = graph.getEdges()
.flatMap(new MapEdgeIds<K, EV>()).distinct();
DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
- return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()),
- graph.getContext()).map(new InvalidIdsMap());
+ return invalidIds.map(new KToTupleMap<K>()).count() == 0;
}
private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable>
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
index 1628952..9db449e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java
@@ -62,7 +62,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- graph.isWeaklyConnected(10).writeAsText(resultPath);
+ env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "true\n";
@@ -78,7 +78,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
- graph.isWeaklyConnected(10).writeAsText(resultPath);
+ env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "false\n";
@@ -94,7 +94,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env).getUndirected();
- graph.isWeaklyConnected(10).writeAsText(resultPath);
+ env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "true\n";
@@ -110,7 +110,7 @@ public class WeaklyConnectedITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected();
- graph.isWeaklyConnected(10).writeAsText(resultPath);
+ env.fromElements(graph.isWeaklyConnected(10)).writeAsText(resultPath);
env.execute();
expectedResult = "false\n";
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index dc0e5d2..3fe69fd 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -123,9 +123,9 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
- DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+ Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
- result.writeAsText(resultPath);
+ env.fromElements(result).writeAsText(resultPath);
env.execute();
expectedResult = "true\n";
@@ -141,8 +141,8 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env);
Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
- DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
- result.writeAsText(resultPath);
+ Boolean result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
+ env.fromElements(result).writeAsText(resultPath);
env.execute();
expectedResult = "false\n";
http://git-wip-us.apache.org/repos/asf/flink/blob/9db170fc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index e7f067a..6210f43 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -180,7 +180,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- graph.numberOfVertices().writeAsText(resultPath);
+ env.fromElements(graph.numberOfVertices()).writeAsText(resultPath);
env.execute();
expectedResult = "5";
@@ -195,7 +195,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- graph.numberOfEdges().writeAsText(resultPath);
+ env.fromElements(graph.numberOfEdges()).writeAsText(resultPath);
env.execute();
expectedResult = "7";
[2/2] flink git commit: [FLINK-1632][gelly] Removed bcast var in
GraphMetrics and PageRank
Posted by va...@apache.org.
[FLINK-1632][gelly] Removed bcast var in GraphMetrics and PageRank
This closes #462
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1daadcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1daadcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1daadcb
Branch: refs/heads/master
Commit: a1daadcba168e7963c8e532b84f7d9613f86b00b
Parents: 9db170f
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 9 11:41:58 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 18:15:38 2015 +0200
----------------------------------------------------------------------
docs/gelly_guide.md | 6 +--
.../main/java/org/apache/flink/graph/Graph.java | 4 +-
.../org/apache/flink/graph/GraphAlgorithm.java | 2 +-
.../flink/graph/example/GraphMetrics.java | 22 ++++------
.../apache/flink/graph/library/PageRank.java | 42 +++++---------------
.../flink/graph/validation/GraphValidator.java | 2 +-
.../validation/InvalidVertexIdsValidator.java | 10 +----
7 files changed, 26 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 30e8315..8058e0a 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -160,10 +160,10 @@ DataSet<Tuple2<K, Long>> outDegrees()
DataSet<Tuple2<K, Long>> getDegrees()
// get the number of vertices
-DataSet<Integer> numberOfVertices()
+long numberOfVertices()
// get the number of edges
-DataSet<Integer> numberOfEdges()
+long numberOfEdges()
{% endhighlight %}
@@ -386,7 +386,7 @@ List<Vertex<Long, Long>> vertices = ...
// create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
List<Edge<Long, Long>> edges = ...
-Graph<Long, Long, Long> graph = Graph.fromcollection(vertices, edges, env);
+Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
// will return false: 6 is an invalid ID
graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 91620cc..334d5d3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -926,7 +926,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* the maximum number of iterations for the inner delta iteration
* @return true if the graph is weakly connected.
*/
- public Boolean isWeaklyConnected(int maxIterations) throws Exception {
+ public boolean isWeaklyConnected(int maxIterations) throws Exception {
// first, convert to an undirected graph
Graph<K, VV, EV> graph = this.getUndirected();
@@ -1145,7 +1145,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
- public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) {
+ public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
return algorithm.run(this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index f5e7018..ceeeaf4 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -27,5 +27,5 @@ import java.io.Serializable;
*/
public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> {
- public Graph<K, VV, EV> run(Graph<K, VV, EV> input);
+ public Graph<K, VV, EV> run(Graph<K, VV, EV> input) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 1977255..c6a776d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -18,16 +18,12 @@
package org.apache.flink.graph.example;
-import java.util.Collection;
-
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.example.utils.ExampleUtils;
@@ -72,8 +68,7 @@ public class GraphMetrics implements ProgramDescription {
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
DataSet<Double> avgNodeDegree = verticesWithDegrees
- .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper())
- .withBroadcastSet(env.fromElements(numVertices), "numberOfVertices");
+ .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
/** find the vertex with the maximum in-degree **/
DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
@@ -100,17 +95,14 @@ public class GraphMetrics implements ProgramDescription {
}
@SuppressWarnings("serial")
- private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long, Long>, Double> {
+ private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
- private int numberOfVertices;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- Collection<Integer> bCastSet = getRuntimeContext()
- .getBroadcastVariable("numberOfVertices");
- numberOfVertices = bCastSet.iterator().next();
+ private long numberOfVertices;
+
+ public AvgNodeDegreeMapper(long numberOfVertices) {
+ this.numberOfVertices = numberOfVertices;
}
-
+
public Double map(Tuple2<Long, Long> sumTuple) {
return (double) (sumTuple.f1 / numberOfVertices) ;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 3e6610e..00ae204 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,11 +20,6 @@ package org.apache.flink.graph.library;
import java.io.Serializable;
-<<<<<<< HEAD
-import org.apache.flink.api.java.DataSet;
-=======
-import org.apache.flink.api.java.ExecutionEnvironment;
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
@@ -45,25 +40,13 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
}
@Override
- public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+ public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
- DataSet<Integer> numberOfVertices = network.numberOfVertices();
+ final long numberOfVertices = network.numberOfVertices();
VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
- new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
-<<<<<<< HEAD
-
- iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
- iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
-
-=======
- try {
- iteration.addBroadcastSetForUpdateFunction("numberOfVertices",
- ExecutionEnvironment.getExecutionEnvironment().fromElements(network.numberOfVertices()));
- } catch (Exception e) {
- e.printStackTrace();
- }
->>>>>>> [FLINK-1632][gelly] Deleted GraphUtils and made Gelly methods use DS.count()
+ new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+ maxIterations);
return network.runVertexCentricIteration(iteration);
}
@@ -76,15 +59,11 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
extends VertexUpdateFunction<K, Double, Double> {
private final double beta;
- private int numVertices;
+ private final long numVertices;
- public VertexRankUpdater(double beta) {
+ public VertexRankUpdater(double beta, long numberOfVertices) {
this.beta = beta;
- }
-
- @Override
- public void preSuperstep(){
- numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+ this.numVertices = numberOfVertices;
}
@Override
@@ -110,11 +89,10 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
public static final class RankMessenger<K extends Comparable<K> & Serializable>
extends MessagingFunction<K, Double, Double, Double> {
- private int numVertices;
+ private final long numVertices;
- @Override
- public void preSuperstep(){
- numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+ public RankMessenger(long numberOfVertices) {
+ this.numVertices = numberOfVertices;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 339e38d..101e82c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -33,6 +33,6 @@ import org.apache.flink.graph.Graph;
public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements Serializable {
- public abstract Boolean validate(Graph<K, VV, EV> graph) throws Exception;
+ public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/a1daadcb/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index aeca482..cc06ca7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -38,11 +38,11 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
* Checks that the edge set input contains valid vertex Ids, i.e. that they
* also exist in the vertex input set.
*
- * @return a Boolean stating whether a graph is valid
+ * @return a boolean stating whether a graph is valid
* with respect to its vertex ids.
*/
@Override
- public Boolean validate(Graph<K, VV, EV> graph) throws Exception {
+ public boolean validate(Graph<K, VV, EV> graph) throws Exception {
DataSet<Tuple1<K>> edgeIds = graph.getEdges()
.flatMap(new MapEdgeIds<K, EV>()).distinct();
DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
@@ -76,10 +76,4 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V
}
}
- private static final class InvalidIdsMap implements MapFunction<Integer, Boolean> {
- public Boolean map(Integer numberOfInvalidIds) throws Exception {
- return numberOfInvalidIds == 0;
- }
- }
-
}
\ No newline at end of file