You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/18 17:38:33 UTC
flink git commit: [FLINK-3770] [gelly] Fix TriangleEnumerator
performance
Repository: flink
Updated Branches:
refs/heads/master 367687df1 -> 85fcfc4d4
[FLINK-3770] [gelly] Fix TriangleEnumerator performance
Implement optimization of ordering edges by degree and a JoinHint for
the joining of edges and vertices.
This closes #1899
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85fcfc4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85fcfc4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85fcfc4d
Branch: refs/heads/master
Commit: 85fcfc4d43feddacb76f2d14229229340174fed8
Parents: 367687d
Author: Greg Hogan <co...@greghogan.com>
Authored: Sat Apr 16 06:04:07 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Apr 18 11:37:41 2016 -0400
----------------------------------------------------------------------
.../flink/graph/examples/data/TriangleCountData.java | 2 +-
.../flink/graph/library/TriangleEnumeratorITCase.java | 4 ++--
.../flink/graph/library/TriangleEnumerator.java | 14 ++++++++------
3 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85fcfc4d/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
index 71b874c..a14010f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
@@ -57,7 +57,7 @@ public class TriangleCountData {
ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
ret.add(new Tuple3<>(1L,2L,3L));
ret.add(new Tuple3<>(2L,3L,6L));
- ret.add(new Tuple3<>(3L,4L,5L));
+ ret.add(new Tuple3<>(4L,3L,5L));
return ret;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85fcfc4d/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
index 56b3289..9550405 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
@@ -47,9 +47,9 @@ public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
env);
List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
- List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();
+ List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();
- Assert.assertEquals(actualOutput.size(), expectedResult.size());
+ Assert.assertEquals(expectedResult.size(), actualOutput.size());
for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) {
Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85fcfc4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 3842e6c..8272d8f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -81,7 +82,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
.reduceGroup(new TriadBuilder<K>())
// filter triads
- .join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
+ .join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
return triangles;
}
@@ -165,7 +166,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
public EdgeWithDegrees<K> reduce(EdgeWithDegrees<K> edge1, EdgeWithDegrees<K> edge2) throws Exception {
// copy first edge
- /*\t*/outEdge.copyFrom(edge1);
+ outEdge.copyFrom(edge1);
// set missing degree
if (edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
@@ -173,6 +174,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
} else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
outEdge.setSecondDegree(edge2.getSecondDegree());
}
+
return outEdge;
}
}
@@ -183,7 +185,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
@SuppressWarnings("serial")
private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
- private final Edge<K, NullValue> outEdge = new Edge<>();
+ private Edge<K, NullValue> outEdge = new Edge<>();
@Override
public Edge<K, NullValue> map(EdgeWithDegrees<K> inEdge) throws Exception {
@@ -195,7 +197,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
// flip vertices if first degree is larger than second degree.
if (inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
- outEdge.reverse();
+ outEdge = outEdge.reverse();
}
// return edge
@@ -214,8 +216,8 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
public Edge<K, NullValue> map(Edge<K, NullValue> inEdge) throws Exception {
// flip vertices if necessary
- if (inEdge.getSource().compareTo(inEdge.getTarget()) < 0) {
- inEdge.reverse();
+ if (inEdge.getSource().compareTo(inEdge.getTarget()) > 0) {
+ inEdge = inEdge.reverse();
}
return inEdge;