You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:30:44 UTC
[3/5] flink git commit: [FLINK-1758] [gelly] Neighborhood Methods
Extensions
[FLINK-1758] [gelly] Neighborhood Methods Extensions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9de640af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9de640af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9de640af
Branch: refs/heads/master
Commit: 9de640af9cd23927bbd59a0b5794b93e85956551
Parents: 6e24879
Author: andralungu <lu...@gmail.com>
Authored: Tue Apr 7 22:29:18 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 14:05:38 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 59 +--
.../org/apache/flink/graph/EdgeDirection.java | 8 +-
.../org/apache/flink/graph/EdgesFunction.java | 5 +-
.../graph/EdgesFunctionWithVertexValue.java | 5 +-
.../main/java/org/apache/flink/graph/Graph.java | 159 ++++++-
.../apache/flink/graph/NeighborsFunction.java | 5 +-
.../graph/NeighborsFunctionWithVertexValue.java | 5 +-
.../apache/flink/graph/ReduceEdgesFunction.java | 36 ++
.../flink/graph/ReduceNeighborsFunction.java | 40 ++
.../flink/graph/example/MusicProfiles.java | 8 +-
.../operations/ReduceOnEdgesMethodsITCase.java | 457 ++++++++++++++++---
.../ReduceOnNeighborMethodsITCase.java | 456 ++++++++++++++++--
12 files changed, 1090 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 21bc335..1a02e6d 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -266,7 +266,10 @@ Neighborhood Methods
Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.
-`reduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `reduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
+`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
+
+The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex.
+When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient.
For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph:
@@ -279,25 +282,28 @@ The following code will collect the out-edges for each vertex and apply the `Sel
{% highlight java %}
Graph<Long, Long, Double> graph = ...
-DataSet<Tuple2<Long, Double>> minWeights = graph.reduceOnEdges(
+DataSet<Tuple2<Long, Double>> minWeights = graph.groupReduceOnEdges(
new SelectMinWeight(), EdgeDirection.OUT);
// user-defined function to select the minimum weight
-static final class SelectMinWeight implements EdgesFunction<Long, Double, Tuple2<Long, Double>> {
+static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
- public Tuple2<Long, Double> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges) {
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
- long minWeight = Double.MAX_VALUE;
- long vertexId = -1;
+ long weight = Long.MAX_VALUE;
+ long minNeighborId = 0;
- for (Tuple2<Long, Edge<Long, Double>> edge: edges) {
- if (edge.f1.getValue() < weight) {
- weight = edge.f1.getValue();
- vertexId = edge.f0;
- }
- return new Tuple2<Long, Double>(vertexId, minWeight);
- }
-}
+ for (Edge<Long, Long> edge: edges) {
+ if (edge.getValue() < weight) {
+ weight = edge.getValue();
+ minNeighborId = edge.getTarget();
+ }
+ }
+ out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
+ }
+ }
{% endhighlight %}
<p class="text-center">
@@ -313,28 +319,23 @@ DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(
new SumValues(), EdgeDirection.IN);
// user-defined function to sum the neighbor values
-static final class SumValues implements NeighborsFunction<Long, Long, Double, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Double>,
- Vertex<Long, Long>>> neighbors) {
-
- long sum = 0;
- long vertexId = -1;
+static final class SumValues implements ReduceNeighborsFunction<Long, Long, Double> {
- for (Tuple3<Long, Edge<Long, Double>, Vertex<Long, Long>> neighbor : neighbors) {
- vertexId = neighbor.f0;
- sum += neighbor.f2.getValue();
- }
- return new Tuple2<Long, Long>(vertexId, sum);
- }
+ public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+
+ long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
+ return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+ new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ }
}
{% endhighlight %}
<p class="text-center">
- <img alt="reduseOnNeighbors Example" width="70%" src="fig/gelly-reduceOnNeighbors.png"/>
+ <img alt="reduceOnNeighbors Example" width="70%" src="img/gelly-reduceOnNeighbors.png"/>
</p>
-When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.
+When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
index 379b302..65d4098 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph;
/**
* The EdgeDirection is used to select a node's neighborhood
- * by the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)},
- * {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
- * {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
- * {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
+ * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
* methods.
*/
public enum EdgeDirection {
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
index d35385f..aac63db 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
/**
* Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)} method.
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method.
*
* @param <K> the vertex key type
* @param <EV> the edge value type
@@ -34,5 +35,5 @@ import org.apache.flink.api.java.tuple.Tuple2;
public interface EdgesFunction<K extends Comparable<K> & Serializable,
EV extends Serializable, O> extends Function, Serializable {
- O iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) throws Exception;
+ void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
index dd0f518..f4f4320 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -21,10 +21,11 @@ package org.apache.flink.graph;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
/**
* Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
* method.
*
* @param <K> the vertex key type
@@ -35,5 +36,5 @@ import org.apache.flink.api.common.functions.Function;
public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, O> extends Function, Serializable {
- O iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws Exception;
+ void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f843827..1325e0c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -28,10 +28,11 @@ import java.util.Arrays;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -722,8 +723,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @return a dataset of a T
* @throws IllegalArgumentException
*/
- public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
@@ -753,8 +754,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @return a dataset of T
* @throws IllegalArgumentException
*/
- public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
@@ -796,7 +797,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
- out.collect(function.iterateEdges(edges));
+ function.iterateEdges(edges, out);
}
@Override
@@ -832,7 +833,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
public void coGroup(Iterable<Vertex<K, VV>> vertex,
Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
- out.collect(function.iterateEdges(vertex.iterator().next(), edges));
+ function.iterateEdges(vertex.iterator().next(), edges, out);
}
@Override
@@ -880,7 +881,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
};
- out.collect(function.iterateEdges(vertex.iterator().next(), edgesIterable));
+ function.iterateEdges(vertex.iterator().next(), edgesIterable, out);
}
@Override
@@ -1238,8 +1239,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @return a dataset of a T
* @throws IllegalArgumentException
*/
- public <T> DataSet<T> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
@@ -1281,8 +1282,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* @return a dataset of a T
* @throws IllegalArgumentException
*/
- public <T> DataSet<T> reduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
- EdgeDirection direction) throws IllegalArgumentException {
+ public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
@@ -1322,8 +1323,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
- out.collect(function.iterateNeighbors(edges));
-
+ function.iterateNeighbors(edges, out);
}
@Override
@@ -1368,7 +1368,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
Collector<T> out) throws Exception {
- out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors));
+ function.iterateNeighbors(vertex.iterator().next(), neighbors, out);
}
@Override
@@ -1417,8 +1417,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
};
- out.collect(function.iterateNeighbors(vertex.iterator().next(),
- neighborsIterable));
+ function.iterateNeighbors(vertex.iterator().next(), neighborsIterable, out);
}
@Override
@@ -1426,4 +1425,126 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
}
}
-}
\ No newline at end of file
+
+ /**
+ * Compute an aggregate over the neighbors (edges and vertices) of each
+ * vertex. The function applied on the neighbors only has access to the
+ * vertex id (not the vertex value).
+ *
+ * @param reduceNeighborsFunction the function to apply to the neighborhood
+ * @param direction the edge direction (in-, out-, all-)
+ * @return a dataset containing one value per vertex
+ * @throws IllegalArgumentException
+ */
+ public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+ switch (direction) {
+ case IN:
+ // create <edge-sourceVertex> pairs
+ final DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ .join(this.vertices).where(0).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(1));
+ return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction<K,VV,EV>(reduceNeighborsFunction))
+ .map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+ case OUT:
+ // create <edge-targetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectVertexIdJoin<K, VV, EV>(0));
+ return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
+ .map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+ case ALL:
+ // create <edge-sourceOrTargetVertex> pairs
+ DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ .flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
+ .join(this.vertices).where(1).equalTo(0)
+ .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+ return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
+ .map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements ReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+
+ private ReduceNeighborsFunction<K, VV, EV> function;
+
+ public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV, EV> fun) {
+ this.function = fun;
+ }
+
+ @Override
+ public Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduce(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> first,
+ Tuple3<K, Edge<K, EV>, Vertex<K, VV>> second) throws Exception {
+ return function.reduceNeighbors(first, second);
+ }
+ }
+
+ public static final class ApplyNeighborhoodMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements MapFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>> ,Tuple2<K, VV>> {
+
+ @Override
+ public Tuple2<K, VV> map(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> edgesWithSrc) throws Exception {
+ return new Tuple2<K, VV>(edgesWithSrc.f0, edgesWithSrc.f2.getValue());
+ }
+ }
+
+ /**
+ * Compute an aggregate over the edges of each vertex. The function applied
+ * on the edges only has access to the vertex id (not the vertex value).
+ *
+ * @param reduceEdgesFunction
+ * the function to apply to the neighborhood
+ * @param direction
+ * the edge direction (in-, out-, all-)
+ * @return a dataset containing one value per vertex
+ * @throws IllegalArgumentException
+ */
+ public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
+ EdgeDirection direction) throws IllegalArgumentException {
+
+ switch (direction) {
+ case IN:
+ return edges.map(new ProjectVertexIdMap<K, EV>(1))
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+ .map(new ApplyEdgesMapFunction());
+ case OUT:
+ return edges.map(new ProjectVertexIdMap<K, EV>(0))
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+ .map(new ApplyEdgesMapFunction());
+ case ALL:
+ return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+ .map(new ApplyEdgesMapFunction());
+ default:
+ throw new IllegalArgumentException("Illegal edge direction");
+ }
+ }
+
+ private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable>
+ implements ReduceFunction<Tuple2<K, Edge<K, EV>>> {
+
+ private ReduceEdgesFunction<K, EV> function;
+
+ public ApplyReduceFunction(ReduceEdgesFunction<K, EV> fun) {
+ this.function = fun;
+ }
+
+ @Override
+ public Tuple2<K, Edge<K, EV>> reduce(Tuple2<K, Edge<K, EV>> first, Tuple2<K, Edge<K, EV>> second) throws Exception {
+ return function.reduceEdges(first, second);
+ }
+ }
+
+ public static final class ApplyEdgesMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements MapFunction<Tuple2<K, Edge<K, EV>> ,Tuple2<K, EV>> {
+
+ @Override
+ public Tuple2<K, EV> map(Tuple2<K, Edge<K, EV>> edge) throws Exception {
+ return new Tuple2<K, EV>(edge.f0, edge.f1.getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index a2d28b2..b43f9d1 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
/**
* Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
+ * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}
* method.
*
* @param <K> the vertex key type
@@ -36,5 +37,5 @@ import org.apache.flink.api.java.tuple.Tuple3;
public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable, O> extends Function, Serializable {
- O iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception;
+ void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index 438ed8a..32d184d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
/**
* Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)}
+ * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
* method.
*
* @param <K> the vertex key type
@@ -36,5 +37,5 @@ import org.apache.flink.api.java.tuple.Tuple2;
public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable, O> extends Function, Serializable {
- O iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception;
+ void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
new file mode 100644
index 0000000..0b5d2cf
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#reduceOnEdges(org.apache.flink.graph.ReduceEdgesFunction, EdgeDirection)} method.
+ *
+ * @param <K> the vertex key type
+ * @param <EV> the edge value type
+ */
+public interface ReduceEdgesFunction<K extends Comparable<K> & Serializable,
+ EV extends Serializable> {
+
+ Tuple2<K, Edge<K, EV>> reduceEdges(Tuple2<K, Edge<K, EV>> firstEdge, Tuple2<K, Edge<K, EV>> secondEdge);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
new file mode 100644
index 0000000..50c0d35
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
+ * method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable, VV extends Serializable,
+ EV extends Serializable> extends Function, Serializable {
+
+ Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduceNeighbors(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> firstNeighbor,
+ Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 9b18623..e8871eb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -100,7 +100,7 @@ public class MusicProfiles implements ProgramDescription {
* Get the top track (most listened) for each user
*/
DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph
- .reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
+ .groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
.filter(new FilterSongNodes());
if (fileOutput) {
@@ -185,8 +185,8 @@ public class MusicProfiles implements ProgramDescription {
public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue<String, NullValue, Integer,
Tuple2<String, String>> {
- public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex,
- Iterable<Edge<String, Integer>> edges) {
+ public void iterateEdges(Vertex<String, NullValue> vertex,
+ Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception {
int maxPlaycount = 0;
String topSong = "";
@@ -196,7 +196,7 @@ public class MusicProfiles implements ProgramDescription {
topSong = edge.getTarget();
}
}
- return new Tuple2<String, String>(vertex.getId(), topSong);
+ out.collect(new Tuple2<String, String>(vertex.getId(), topSong));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index ec0c84c..2452cba 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -26,9 +26,11 @@ import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.EdgesFunction;
import org.apache.flink.graph.EdgesFunctionWithVertexValue;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceEdgesFunction;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -71,7 +73,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
+ graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
@@ -93,7 +95,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
+ graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
@@ -105,6 +107,210 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testAllOutNeighbors() throws Exception {
+ /*
+ * Get the all the out-neighbors for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
+ graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT);
+ verticesWithAllOutNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "1,3\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,5\n" +
+ "5,1";
+ }
+
+ @Test
+ public void testAllOutNeighborsNoValue() throws Exception {
+ /*
+ * Get the all the out-neighbors for each vertex except for the vertex with id 5.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
+ graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT);
+ verticesWithAllOutNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "1,3\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,5";
+ }
+
+ @Test
+ public void testAllOutNeighborsWithValueGreaterThanTwo() throws Exception {
+ /*
+ * Get the all the out-neighbors for each vertex that have a value greater than two.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
+ graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT);
+ verticesWithAllOutNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "3,4\n" +
+ "3,5\n" +
+ "4,5\n" +
+ "5,1";
+ }
+
+ @Test
+ public void testAllInNeighbors() throws Exception {
+ /*
+ * Get the all the in-neighbors for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
+ graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN);
+ verticesWithAllInNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,5\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "4,3\n" +
+ "5,3\n" +
+ "5,4";
+ }
+
+ @Test
+ public void testAllInNeighborsNoValue() throws Exception {
+ /*
+ * Get the all the in-neighbors for each vertex except for the vertex with id 5.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
+ graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN);
+ verticesWithAllInNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,5\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "4,3";
+ }
+
+ @Test
+ public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception {
+ /*
+ * Get the all the in-neighbors for each vertex that have a value greater than two.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
+ graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
+ verticesWithAllInNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "3,1\n" +
+ "3,2\n" +
+ "4,3\n" +
+ "5,3\n" +
+ "5,4";
+ }
+
+ @Test
+ public void testAllNeighbors() throws Exception {
+ /*
+ * Get the all the neighbors for each vertex
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
+ graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL);
+ verticesWithAllNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "1,3\n" +
+ "1,5\n" +
+ "2,1\n" +
+ "2,3\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,3\n" +
+ "4,5\n" +
+ "5,1\n" +
+ "5,3\n" +
+ "5,4";
+ }
+
+ @Test
+ public void testAllNeighborsNoValue() throws Exception {
+ /*
+ * Get the all the neighbors for each vertex except for vertices with id 5 and 2.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
+ graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL);
+ verticesWithAllNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,2\n" +
+ "1,3\n" +
+ "1,5\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,3\n" +
+ "4,5";
+ }
+
+ @Test
+ public void testAllNeighborsWithValueGreaterThanFour() throws Exception {
+ /*
+ * Get the all the neighbors for each vertex that have a value greater than four.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
+ graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
+ verticesWithAllNeighbors.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "5,1\n" +
+ "5,3\n" +
+ "5,4";
+ }
+
+ @Test
public void testMaxWeightEdge() throws Exception {
/*
* Get the maximum weight among all edges
@@ -115,7 +321,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
- graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+ graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
env.execute();
@@ -159,7 +365,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+ graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
env.execute();
@@ -195,28 +401,29 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> iterateEdges(
- Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges) {
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
-
+ long minNeighborId = 0;
+
for (Edge<Long, Long> edge: edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
- minNeighorId = edge.getTarget();
+ minNeighborId = edge.getTarget();
}
}
- return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+ out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
}
}
@SuppressWarnings("serial")
private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges) {
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MIN_VALUE;
@@ -225,60 +432,65 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
weight = edge.getValue();
}
}
- return new Tuple2<Long, Long>(v.getId(), weight);
+ out.collect(new Tuple2<Long, Long>(v.getId(), weight));
}
}
@SuppressWarnings("serial")
- private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+ private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
- public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+ @Override
+ public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
+ Tuple2<Long, Edge<Long, Long>> secondEdge) {
- long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
- long vertexId = -1;
- long i=0;
-
- for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
- if (edge.f1.getValue() < weight) {
- weight = edge.f1.getValue();
- minNeighorId = edge.f1.getTarget();
- }
- if (i==0) {
- vertexId = edge.f0;
- } i++;
+ if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+ return firstEdge;
+ } else {
+ return secondEdge;
}
- return new Tuple2<Long, Long>(vertexId, minNeighorId);
+
}
}
@SuppressWarnings("serial")
- private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-
- long weight = Long.MIN_VALUE;
- long vertexId = -1;
- long i=0;
-
- for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
- if (edge.f1.getValue() > weight) {
- weight = edge.f1.getValue();
- }
- if (i==0) {
- vertexId = edge.f0;
- } i++;
+ private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
+
+// @Override
+// public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+// Collector<Tuple2<Long, Long>> out) throws Exception {
+//
+// long weight = Long.MIN_VALUE;
+// long vertexId = -1;
+// long i=0;
+//
+// for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+// if (edge.f1.getValue() > weight) {
+// weight = edge.f1.getValue();
+// }
+// if (i==0) {
+// vertexId = edge.f0;
+// } i++;
+// }
+// out.collect(new Tuple2<Long, Long>(vertexId, weight));
+// }
+
+ @Override
+ public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
+ Tuple2<Long, Edge<Long, Long>> secondEdge) {
+ if(firstEdge.f1.getValue() > secondEdge.f1.getValue()) {
+ return firstEdge;
+ } else {
+ return secondEdge;
}
- return new Tuple2<Long, Long>(vertexId, weight);
}
}
@SuppressWarnings("serial")
private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> iterateEdges(
- Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges) {
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v,
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MAX_VALUE;
long minNeighorId = 0;
@@ -289,14 +501,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
minNeighorId = edge.getSource();
}
}
- return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+ out.collect(new Tuple2<Long, Long>(v.getId(), minNeighorId));
}
}
@SuppressWarnings("serial")
private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
- public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MAX_VALUE;
long minNeighorId = 0;
@@ -312,7 +526,146 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
vertexId = edge.f0;
} i++;
}
- return new Tuple2<Long, Long>(vertexId, minNeighorId);
+ out.collect(new Tuple2<Long, Long>(vertexId, minNeighorId));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectOutNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectOutNeighborsExcludeFive implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ if(edge.f0 != 5) {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectOutNeighborsValueGreaterThanTwo implements
+ EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+ for (Edge<Long, Long> edge: edges) {
+ if(v.getValue() > 2) {
+ out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectInNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectInNeighborsExceptFive implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ if(edge.f0 != 5) {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectInNeighborsValueGreaterThanTwo implements
+ EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+ for (Edge<Long, Long> edge: edges) {
+ if(v.getValue() > 2) {
+ out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+ for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ if (edge.f0 == edge.f1.getTarget()) {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+ } else {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectNeighborsExceptFiveAndTwo implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+ for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ if(edge.f0 != 5 && edge.f0 != 2) {
+ if (edge.f0 == edge.f1.getTarget()) {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+ } else {
+ out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SelectNeighborsValueGreaterThanFour implements
+ EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+ for(Edge<Long, Long> edge : edges) {
+ if(v.getValue() > 4) {
+ if(v.getId().equals(edge.getTarget())) {
+ out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+ } else {
+ out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+ }
+ }
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index 785552c..5300d24 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.test.operations;
import java.util.Iterator;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,9 +30,11 @@ import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.NeighborsFunction;
import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.ReduceNeighborsFunction;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -74,7 +77,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+ graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
env.execute();
@@ -96,7 +99,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
+ graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
verticesWithSum.writeAsCsv(resultPath);
env.execute();
@@ -119,7 +122,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+ graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
env.execute();
@@ -132,6 +135,65 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception {
+ /*
+ * Get the sum of out-neighbor values
+ * for each vertex with id greater than three.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "4,5\n" +
+ "5,1\n";
+ }
+
+ @Test
+ public void testSumOfInNeighborsIdGreaterThanThree() throws Exception {
+ /*
+ * Get the sum of in-neighbor values
+ * times the edge weights for each vertex with id greater than three.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSum =
+ graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
+
+ verticesWithSum.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "4,102\n" +
+ "5,285\n";
+ }
+
+ @Test
+ public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception {
+ /*
+ * Get the sum of all neighbor values
+ * including own vertex value
+ * for each vertex with id greater than three.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "4,12\n" +
+ "5,13\n";
+ }
+
+ @Test
public void testSumOfOutNeighborsNoValue() throws Exception {
/*
* Get the sum of out-neighbor values
@@ -166,14 +228,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
-
verticesWithSum.writeAsCsv(resultPath);
env.execute();
expectedResult = "1,255\n" +
- "2,12\n" +
+ "2,12\n" +
"3,59\n" +
- "4,102\n" +
+ "4,102\n" +
"5,285\n";
}
@@ -187,10 +248,10 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath);
env.execute();
expectedResult = "1,10\n" +
@@ -200,33 +261,191 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"5,8\n";
}
+ @Test
+ public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+ /*
+ * Get the sum of out-neighbor values
+ * for each vertex with id greater than two as well as the same sum multiplied by two.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "3,9\n" +
+ "3,18\n" +
+ "4,5\n" +
+ "4,10\n" +
+ "5,1\n" +
+ "5,2";
+ }
+
+ @Test
+ public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+ /*
+ * Get the sum of in-neighbor values
+ * for each vertex with id greater than two as well as the same sum multiplied by two.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "3,59\n" +
+ "3,118\n" +
+ "4,204\n" +
+ "4,102\n" +
+ "5,570\n" +
+ "5,285";
+ }
+
+ @Test
+ public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+ /*
+ * Get the sum of all neighbor values
+ * for each vertex with id greater than two as well as the same sum multiplied by two.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
+ graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
+
+ verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "3,12\n" +
+ "3,24\n" +
+ "4,8\n" +
+ "4,16\n" +
+ "5,8\n" +
+ "5,16";
+ }
+
+ @Test
+ public void testSumOfOutNeighborsMultipliedByTwo() throws Exception {
+ /*
+ * Get the sum of out-neighbor values
+ * for each vertex as well as the sum of out-neighbor values multiplied by two.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,5\n" +
+ "1,10\n" +
+ "2,3\n" +
+ "2,6\n" +
+ "3,9\n" +
+ "3,18\n" +
+ "4,5\n" +
+ "4,10\n" +
+ "5,1\n" +
+ "5,2";
+ }
+
+ @Test
+ public void testSumOfInNeighborsSubtractOne() throws Exception {
+ /*
+ * Get the sum of in-neighbor values
+ * times the edge weights for each vertex as well as the same sum minus one.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSum =
+ graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
+
+ verticesWithSum.writeAsCsv(resultPath);
+ env.execute();
+ expectedResult = "1,255\n" +
+ "1,254\n" +
+ "2,12\n" +
+ "2,11\n" +
+ "3,59\n" +
+ "3,58\n" +
+ "4,102\n" +
+ "4,101\n" +
+ "5,285\n" +
+ "5,284";
+ }
+
+ @Test
+ public void testSumOfOAllNeighborsAddFive() throws Exception {
+ /*
+ * Get the sum of all neighbor values
+ * including own vertex value
+ * for each vertex as well as the same sum plus five.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
+
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
+
+ verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+ env.execute();
+
+ expectedResult = "1,11\n" +
+ "1,16\n" +
+ "2,6\n" +
+ "2,11\n" +
+ "3,15\n" +
+ "3,20\n" +
+ "4,12\n" +
+ "4,17\n" +
+ "5,13\n" +
+ "5,18";
+ }
+
@SuppressWarnings("serial")
private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
Tuple2<Long, Long>> {
- public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
- return new Tuple2<Long, Long>(vertex.getId(), sum);
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
}
}
@SuppressWarnings("serial")
private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
}
- return new Tuple2<Long, Long>(vertex.getId(), sum);
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
}
}
@@ -234,23 +453,120 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
Tuple2<Long, Long>> {
- public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
- return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
}
}
@SuppressWarnings("serial")
- private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f1.getValue();
+ }
+ if(vertex.getId() > 3) {
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+ }
+ if(vertex.getId() > 3) {
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f1.getValue();
+ }
+ if(vertex.getId() > 3) {
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+
+ @Override
+ public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+ long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
+ return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+ new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+
+ @Override
+ public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+ long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() +
+ secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue();
+ return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+ new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+
+ @Override
+ public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+ Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+ long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
+ return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+ new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ }
+ }
- public Tuple2<Long, Long> iterateNeighbors(
- Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+ @SuppressWarnings("serial")
+ private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
@@ -260,17 +576,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
next = neighborsIterator.next();
sum += next.f2.getValue();
}
- return new Tuple2<Long, Long>(next.f0, sum);
+ if(next.f0 > 2) {
+ out.collect(new Tuple2<Long, Long>(next.f0, sum));
+ out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+ }
}
}
@SuppressWarnings("serial")
- private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
-
- public Tuple2<Long, Long> iterateNeighbors(
- Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
+ private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
@@ -279,17 +599,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
next = neighborsIterator.next();
sum += next.f2.getValue() * next.f1.getValue();
}
- return new Tuple2<Long, Long>(next.f0, sum);
+ if(next.f0 > 2) {
+ out.collect(new Tuple2<Long, Long>(next.f0, sum));
+ out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+ }
}
}
@SuppressWarnings("serial")
- private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
- public Tuple2<Long, Long> iterateNeighbors(
- Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
@@ -298,7 +622,65 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
next = neighborsIterator.next();
sum += next.f2.getValue();
}
- return new Tuple2<Long, Long>(next.f0, sum);
+ if(next.f0 > 2) {
+ out.collect(new Tuple2<Long, Long>(next.f0, sum));
+ out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f1.getValue();
+ }
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
}
}
-}
\ No newline at end of file
+
+ @SuppressWarnings("serial")
+ private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+ }
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+ Tuple2<Long, Long>> {
+
+ @Override
+ public void iterateNeighbors(Vertex<Long, Long> vertex,
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ long sum = 0;
+ for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+ sum += neighbor.f1.getValue();
+ }
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+ out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
+ }
+ }
+}
+