You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/04/18 17:38:33 UTC

flink git commit: [FLINK-3770] [gelly] Fix TriangleEnumerator performance

Repository: flink
Updated Branches:
  refs/heads/master 367687df1 -> 85fcfc4d4


[FLINK-3770] [gelly] Fix TriangleEnumerator performance

Implement optimization of ordering edges by degree and a JoinHint for
the joining of edges and vertices.

This closes #1899


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85fcfc4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85fcfc4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85fcfc4d

Branch: refs/heads/master
Commit: 85fcfc4d43feddacb76f2d14229229340174fed8
Parents: 367687d
Author: Greg Hogan <co...@greghogan.com>
Authored: Sat Apr 16 06:04:07 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Apr 18 11:37:41 2016 -0400

----------------------------------------------------------------------
 .../flink/graph/examples/data/TriangleCountData.java  |  2 +-
 .../flink/graph/library/TriangleEnumeratorITCase.java |  4 ++--
 .../flink/graph/library/TriangleEnumerator.java       | 14 ++++++++------
 3 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85fcfc4d/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
index 71b874c..a14010f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
@@ -57,7 +57,7 @@ public class TriangleCountData {
 		ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
 		ret.add(new Tuple3<>(1L,2L,3L));
 		ret.add(new Tuple3<>(2L,3L,6L));
-		ret.add(new Tuple3<>(3L,4L,5L));
+		ret.add(new Tuple3<>(4L,3L,5L));
 		return ret;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/85fcfc4d/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
index 56b3289..9550405 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
@@ -47,9 +47,9 @@ public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
 				env);
 
 		List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
-		List<Tuple3<Long,Long,Long>>  expectedResult = TriangleCountData.getListOfTriangles();
+		List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();
 
-		Assert.assertEquals(actualOutput.size(), expectedResult.size());
+		Assert.assertEquals(expectedResult.size(), actualOutput.size());
 		for(Tuple3<Long,Long,Long> resultTriangle:actualOutput)	{
 			Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/85fcfc4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 3842e6c..8272d8f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -81,7 +82,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
 				.reduceGroup(new TriadBuilder<K>())
 				// filter triads
-				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
+				.join(edgesById, JoinHint.REPARTITION_HASH_SECOND).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
 
 		return triangles;
 	}
@@ -165,7 +166,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 		public EdgeWithDegrees<K> reduce(EdgeWithDegrees<K> edge1, EdgeWithDegrees<K> edge2) throws Exception {
 
 			// copy first edge
-		/*\t*/outEdge.copyFrom(edge1);
+			outEdge.copyFrom(edge1);
 
 			// set missing degree
 			if (edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
@@ -173,6 +174,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 			} else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
 				outEdge.setSecondDegree(edge2.getSecondDegree());
 			}
+
 			return outEdge;
 		}
 	}
@@ -183,7 +185,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 	@SuppressWarnings("serial")
 	private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
 
-		private final Edge<K, NullValue> outEdge = new Edge<>();
+		private Edge<K, NullValue> outEdge = new Edge<>();
 
 		@Override
 		public Edge<K, NullValue> map(EdgeWithDegrees<K> inEdge) throws Exception {
@@ -195,7 +197,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 
 			// flip vertices if first degree is larger than second degree.
 			if (inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
-				outEdge.reverse();
+				outEdge = outEdge.reverse();
 			}
 
 			// return edge
@@ -214,8 +216,8 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 		public Edge<K, NullValue> map(Edge<K, NullValue> inEdge) throws Exception {
 
 			// flip vertices if necessary
-			if (inEdge.getSource().compareTo(inEdge.getTarget()) < 0) {
-				inEdge.reverse();
+			if (inEdge.getSource().compareTo(inEdge.getTarget()) > 0) {
+				inEdge = inEdge.reverse();
 			}
 
 			return inEdge;