You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:30:43 UTC
[2/5] flink git commit: [FLINK-1758][gelly] Made reduce methods
operate on values
[FLINK-1758][gelly] Made reduce methods operate on values
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea9d9a0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea9d9a0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea9d9a0f
Branch: refs/heads/master
Commit: ea9d9a0f672e55f6947e2b04e6628eb0167ef1ec
Parents: d015bb0
Author: andralungu <lu...@gmail.com>
Authored: Mon Apr 20 23:09:36 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 14:05:38 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 11 +-
.../main/java/org/apache/flink/graph/Graph.java | 142 +++++++++++--------
.../apache/flink/graph/ReduceEdgesFunction.java | 2 +-
.../flink/graph/ReduceNeighborsFunction.java | 9 +-
.../operations/ReduceOnEdgesMethodsITCase.java | 24 ++--
.../ReduceOnNeighborMethodsITCase.java | 22 ++-
6 files changed, 114 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 6390d6f..6488029 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -320,14 +320,13 @@ DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(
new SumValues(), EdgeDirection.IN);
// user-defined function to sum the neighbor values
-static final class SumValues implements ReduceNeighborsFunction<Long, Long, Double> {
+static final class SumValues implements ReduceNeighborsFunction<Long, Long> {
- public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+ public Tuple2<Long, Long> reduceNeighbors(Tuple2<Long, Long> firstNeighbor,
+ Tuple2<Long, Long> secondNeighbor) {
- long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
- return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
- new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ long sum = firstNeighbor.f1 + secondNeighbor.f1;
+ return new Tuple2<Long, Long>(firstNeighbor.f0, sum));
}
}
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 48d39b1..e21fd93 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -787,6 +787,21 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
+ private static final class ProjectVertexWithEdgeValueMap<K extends Comparable<K> & Serializable, EV extends Serializable>
+ implements MapFunction<Edge<K, EV>, Tuple2<K, EV>> {
+
+ private int fieldPosition;
+
+ public ProjectVertexWithEdgeValueMap(int position) {
+ this.fieldPosition = position;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Tuple2<K, EV> map(Edge<K, EV> edge) {
+ return new Tuple2<K, EV>((K) edge.getField(fieldPosition), edge.getValue());
+ }
+ }
+
private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable, T>
implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
@@ -814,6 +829,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
+ private static final class EmitOneVertexWithEdgeValuePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements FlatMapFunction<Edge<K, EV>, Tuple2<K, EV>> {
+ public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
+ out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
+ out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
+ }
+ }
+
private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
@@ -1332,27 +1355,50 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
- private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
- implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+ private static final class ProjectVertexWithNeighborValueJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
private int fieldPosition;
- public ProjectVertexIdJoin(int position) {
+ public ProjectVertexWithNeighborValueJoin(int position) {
this.fieldPosition = position;
}
@SuppressWarnings("unchecked")
public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
- Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
+ Collector<Tuple2<K, VV>> out) {
+ out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
+ }
+ }
+
+ private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+ private int fieldPosition;
+ public ProjectVertexIdJoin(int position) {
+ this.fieldPosition = position;
+ }
+ @SuppressWarnings("unchecked")
+ public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
+ Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
}
}
+ private static final class ProjectNeighborValue<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+ implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
+ @SuppressWarnings("unchecked")
+ public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
+ Collector<Tuple2<K, VV>> out) {
+
+ out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
+ }
+ }
+
private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+ @SuppressWarnings("unchecked")
public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
- Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
-
+ Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
}
}
@@ -1433,65 +1479,53 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
*
* @param reduceNeighborsFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
- * @return a dataset containing one value per vertex
+ * @return a dataset containing one value per vertex(vertex id, vertex value)
* @throws IllegalArgumentException
*/
- public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
+ public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<K, VV> reduceNeighborsFunction,
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
- // create <edge-sourceVertex> pairs
- final DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+ // create <vertex-source value> pairs
+ final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
.join(this.vertices).where(0).equalTo(0)
- .with(new ProjectVertexIdJoin<K, VV, EV>(1));
- return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction<K,VV,EV>(reduceNeighborsFunction))
- .map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+ .with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1));
+ return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(reduceNeighborsFunction));
case OUT:
- // create <edge-targetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+ // create <vertex-target value> pairs
+ DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectVertexIdJoin<K, VV, EV>(0));
- return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
- .map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+ .with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0));
+ return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(reduceNeighborsFunction));
case ALL:
- // create <edge-sourceOrTargetVertex> pairs
- DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+ // create <vertex-neighbor value> pairs
+ DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
.flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
.join(this.vertices).where(1).equalTo(0)
- .with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+ .with(new ProjectNeighborValue<K, VV, EV>());
- return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
- .map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+ return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(reduceNeighborsFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
- private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
- implements ReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+ private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable>
+ implements ReduceFunction<Tuple2<K, VV>> {
- private ReduceNeighborsFunction<K, VV, EV> function;
+ private ReduceNeighborsFunction<K, VV> function;
- public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV, EV> fun) {
+ public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV> fun) {
this.function = fun;
}
@Override
- public Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduce(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> first,
- Tuple3<K, Edge<K, EV>, Vertex<K, VV>> second) throws Exception {
+ public Tuple2<K, VV> reduce(Tuple2<K, VV> first,
+ Tuple2<K, VV> second) throws Exception {
return function.reduceNeighbors(first, second);
}
}
- public static final class ApplyNeighborhoodMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
- implements MapFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>> ,Tuple2<K, VV>> {
-
- @Override
- public Tuple2<K, VV> map(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> edgesWithSrc) throws Exception {
- return new Tuple2<K, VV>(edgesWithSrc.f0, edgesWithSrc.f2.getValue());
- }
- }
-
/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges only has access to the vertex id (not the vertex value).
@@ -1500,32 +1534,29 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
* the function to apply to the neighborhood
* @param direction
* the edge direction (in-, out-, all-)
- * @return a dataset containing one value per vertex
+ * @return a dataset containing one value per vertex(vertex key, edge value)
* @throws IllegalArgumentException
*/
- public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
+ public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
EdgeDirection direction) throws IllegalArgumentException {
switch (direction) {
case IN:
- return edges.map(new ProjectVertexIdMap<K, EV>(1))
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
- .map(new ApplyEdgesMapFunction());
+ return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
case OUT:
- return edges.map(new ProjectVertexIdMap<K, EV>(0))
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
- .map(new ApplyEdgesMapFunction());
+ return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
case ALL:
- return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
- .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
- .map(new ApplyEdgesMapFunction());
+ return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, VV, EV>())
+ .groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable>
- implements ReduceFunction<Tuple2<K, Edge<K, EV>>> {
+ implements ReduceFunction<Tuple2<K, EV>> {
private ReduceEdgesFunction<K, EV> function;
@@ -1534,17 +1565,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
@Override
- public Tuple2<K, Edge<K, EV>> reduce(Tuple2<K, Edge<K, EV>> first, Tuple2<K, Edge<K, EV>> second) throws Exception {
+ public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
return function.reduceEdges(first, second);
}
}
-
- public static final class ApplyEdgesMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
- implements MapFunction<Tuple2<K, Edge<K, EV>> ,Tuple2<K, EV>> {
-
- @Override
- public Tuple2<K, EV> map(Tuple2<K, Edge<K, EV>> edge) throws Exception {
- return new Tuple2<K, EV>(edge.f0, edge.f1.getValue());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index 53c7934..a411aec 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -33,5 +33,5 @@ import java.io.Serializable;
public interface ReduceEdgesFunction<K extends Comparable<K> & Serializable,
EV extends Serializable> extends Function, Serializable {
- Tuple2<K, Edge<K, EV>> reduceEdges(Tuple2<K, Edge<K, EV>> firstEdge, Tuple2<K, Edge<K, EV>> secondEdge);
+ Tuple2<K, EV> reduceEdges(Tuple2<K, EV> firstEdge, Tuple2<K, EV> secondEdge);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index f5e978f..8115a5d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.graph;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
@@ -32,9 +32,8 @@ import java.io.Serializable;
* @param <VV> the vertex value type
* @param <EV> the edge value type
*/
-public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable, VV extends Serializable,
- EV extends Serializable> extends Function, Serializable {
+public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable, VV extends Serializable> extends Function, Serializable {
- Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduceNeighbors(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> firstNeighbor,
- Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
+ Tuple2<K, VV> reduceNeighbors(Tuple2<K, VV> firstNeighbor,
+ Tuple2<K, VV> secondNeighbor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index 3ace49a..8a0e258 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -440,10 +440,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
@Override
- public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
- Tuple2<Long, Edge<Long, Long>> secondEdge) {
+ public Tuple2<Long, Long> reduceEdges(Tuple2<Long, Long> firstEdge,
+ Tuple2<Long, Long> secondEdge) {
- if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+ if(firstEdge.f1 < secondEdge.f1) {
return firstEdge;
} else {
return secondEdge;
@@ -456,9 +456,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
@Override
- public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
- Tuple2<Long, Edge<Long, Long>> secondEdge) {
- if(firstEdge.f1.getValue() > secondEdge.f1.getValue()) {
+ public Tuple2<Long, Long> reduceEdges(Tuple2<Long, Long> firstEdge,
+ Tuple2<Long, Long> secondEdge) {
+ if(firstEdge.f1 > secondEdge.f1) {
return firstEdge;
} else {
return secondEdge;
@@ -474,15 +474,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
long weight = Long.MAX_VALUE;
- long minNeighorId = 0;
+ long minNeighborId = 0;
for (Edge<Long, Long> edge: edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
- minNeighorId = edge.getSource();
+ minNeighborId = edge.getSource();
}
}
- out.collect(new Tuple2<Long, Long>(v.getId(), minNeighorId));
+ out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
}
}
@@ -490,9 +490,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
@Override
- public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
- Tuple2<Long, Edge<Long, Long>> secondEdge) {
- if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+ public Tuple2<Long, Long> reduceEdges(Tuple2<Long, Long> firstEdge,
+ Tuple2<Long, Long> secondEdge) {
+ if(firstEdge.f1 < secondEdge.f1) {
return firstEdge;
} else {
return secondEdge;
http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index 5f23569..62d6dc9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -523,14 +523,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+ private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction<Long, Long> {
@Override
- public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
- long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
- return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
- new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ public Tuple2<Long, Long> reduceNeighbors(Tuple2<Long, Long> firstNeighbor,
+ Tuple2<Long, Long> secondNeighbor) {
+ long sum = firstNeighbor.f1 + secondNeighbor.f1;
+ return new Tuple2<Long, Long>(firstNeighbor.f0, sum);
}
}
@@ -554,14 +553,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+ private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction<Long, Long> {
@Override
- public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
- Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
- long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
- return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
- new Vertex<Long, Long>(firstNeighbor.f0, sum));
+ public Tuple2<Long, Long> reduceNeighbors(Tuple2<Long, Long> firstNeighbor,
+ Tuple2<Long, Long> secondNeighbor) {
+ long sum = firstNeighbor.f1 + secondNeighbor.f1;
+ return new Tuple2<Long, Long>(firstNeighbor.f0, sum);
}
}