You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/26 14:32:08 UTC

[3/5] flink git commit: [FLINK-6648] [gelly] Transforms for Gelly examples

http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
index 9e26dfe..180c46f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
@@ -35,9 +35,25 @@ public abstract class TriangleListingBase<K, VV, EV, R>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
 
 	// Optional configuration
+	protected boolean permuteResults;
+
 	protected OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
 
 	/**
+	 * By default only one result is output for each triangle, whether vertices
+	 * are sorted or unsorted. When permutation is enabled a result is instead
+	 * output for each of the six permutations of the three vertex IDs.
+	 *
+	 * @param permuteResults whether output results should be permuted
+	 * @return this
+	 */
+	public TriangleListingBase<K, VV, EV, R> setPermuteResults(boolean permuteResults) {
+		this.permuteResults = permuteResults;
+
+		return this;
+	}
+
+	/**
 	 * Normalize the triangle listing such that for each result (K0, K1, K2)
 	 * the vertex IDs are sorted K0 < K1 < K2.
 	 *
@@ -51,6 +67,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
 	}
 
 	@Override
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
+			return false;
+		}
+
+		TriangleListingBase rhs = (TriangleListingBase) other;
+
+		return permuteResults == rhs.permuteResults;
+	}
+
+	@Override
 	protected final void mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		super.mergeConfiguration(other);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index beddd24..00b2210 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.graph.library.clustering.directed;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -121,7 +122,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 			.with(new ProjectTriangles<K>())
 				.name("Triangle listing");
 
-		if (sortTriangleVertices.get()) {
+		if (permuteResults) {
+			triangles = triangles
+				.flatMap(new PermuteResult<K>())
+					.name("Permute triangle vertices");
+		} else if (sortTriangleVertices.get()) {
 			triangles = triangles
 				.map(new SortTriangleVertices<K>())
 					.name("Sort triangle vertices");
@@ -309,6 +314,98 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 	}
 
 	/**
+	 * Output each input and an additional result for each of the five
+	 * permutations of the three vertex IDs.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class PermuteResult<T>
+	implements FlatMapFunction<Result<T>, Result<T>> {
+		@Override
+		public void flatMap(Result<T> value, Collector<Result<T>> out)
+				throws Exception {
+			T tmp;
+
+			int f0f1, f0f2, f1f2;
+
+			byte bitmask = value.getBitmask().getValue();
+
+			// 0, 1, 2
+			out.collect(value);
+
+			tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId1());
+			value.setVertexId1(tmp);
+
+			f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
+			f0f2 = (bitmask & 0b001100) >>> 2;
+			f1f2 = (bitmask & 0b000011) << 2;
+
+			bitmask = (byte) (f0f1 | f0f2 | f1f2);
+			value.setBitmask(bitmask);
+
+			// 1, 0, 2
+			out.collect(value);
+
+			tmp = value.getVertexId1();
+			value.setVertexId1(value.getVertexId2());
+			value.setVertexId2(tmp);
+
+			f0f1 = (bitmask & 0b110000) >>> 2;
+			f0f2 = (bitmask & 0b001100) << 2;
+			f1f2 = ((bitmask & 0b000010) >>> 1) | ((bitmask & 0b000001) << 1);
+
+			bitmask = (byte) (f0f1 | f0f2 | f1f2);
+			value.setBitmask(bitmask);
+
+			// 1, 2, 0
+			out.collect(value);
+
+			tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId2());
+			value.setVertexId2(tmp);
+
+			f0f1 = ((bitmask & 0b100000) >>> 5) | ((bitmask & 0b010000) >>> 3);
+			f0f2 = ((bitmask & 0b001000) >>> 1) | ((bitmask & 0b000100) << 1);
+			f1f2 = ((bitmask & 0b000010) << 3) | ((bitmask & 0b000001) << 5);
+
+			bitmask = (byte) (f0f1 | f0f2 | f1f2);
+			value.setBitmask(bitmask);
+
+			// 0, 2, 1
+			out.collect(value);
+
+			tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId1());
+			value.setVertexId1(tmp);
+
+			f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
+			f0f2 = (bitmask & 0b001100) >>> 2;
+			f1f2 = (bitmask & 0b000011) << 2;
+
+			bitmask = (byte) (f0f1 | f0f2 | f1f2);
+			value.setBitmask(bitmask);
+
+			// 2, 0, 1
+			out.collect(value);
+
+			tmp = value.getVertexId1();
+			value.setVertexId1(value.getVertexId2());
+			value.setVertexId2(tmp);
+
+			f0f1 = (bitmask & 0b110000) >>> 2;
+			f0f2 = (bitmask & 0b001100) << 2;
+			f1f2 = ((bitmask & 0b000010) >>> 1) | ((bitmask & 0b000001) << 1);
+
+			bitmask = (byte) (f0f1 | f0f2 | f1f2);
+			value.setBitmask(bitmask);
+
+			// 2, 1, 0
+			out.collect(value);
+		}
+	}
+
+	/**
 	 * Reorders the vertices of each emitted triangle (K0, K1, K2, bitmask)
 	 * into sorted order such that K0 < K1 < K2.
 	 *
@@ -384,7 +481,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 			this.bitmask = bitmask;
 		}
 
-		private void setBitmask(byte bitmask) {
+		public void setBitmask(byte bitmask) {
 			this.bitmask.setValue(bitmask);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index c714bed..2472744 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -113,7 +113,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 			.with(new ProjectTriangles<K>())
 				.name("Triangle listing");
 
-		if (sortTriangleVertices.get()) {
+		if (permuteResults) {
+			triangles = triangles
+				.flatMap(new PermuteResult<K>())
+					.name("Permute triangle vertices");
+		} else if (sortTriangleVertices.get()) {
 			triangles = triangles
 				.map(new SortTriangleVertices<K>())
 					.name("Sort triangle vertices");
@@ -250,6 +254,59 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 	}
 
 	/**
+	 * Output each input and an additional result for each of the five
+	 * permutations of the three vertex IDs.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class PermuteResult<T>
+	implements FlatMapFunction<Result<T>, Result<T>> {
+		@Override
+		public void flatMap(Result<T> value, Collector<Result<T>> out)
+				throws Exception {
+			T tmp;
+
+			// 0, 1, 2
+			out.collect(value);
+
+			tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId1());
+			value.setVertexId1(tmp);
+
+			// 1, 0, 2
+			out.collect(value);
+
+			tmp = value.getVertexId1();
+			value.setVertexId1(value.getVertexId2());
+			value.setVertexId2(tmp);
+
+			// 1, 2, 0
+			out.collect(value);
+
+			tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId2());
+			value.setVertexId2(tmp);
+
+			// 0, 2, 1
+			out.collect(value);
+
+			tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId1());
+			value.setVertexId1(tmp);
+
+			// 2, 0, 1
+			out.collect(value);
+
+			tmp = value.getVertexId1();
+			value.setVertexId1(value.getVertexId2());
+			value.setVertexId2(tmp);
+
+			// 2, 1, 0
+			out.collect(value);
+		}
+	}
+
+	/**
 	 * Reorders the vertices of each emitted triangle (K0, K1, K2)
 	 * into sorted order such that K0 < K1 < K2.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 78dff0b..d761f60 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.asm.result.BinaryResult.MirrorResult;
 import org.apache.flink.graph.asm.result.BinaryResultBase;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
@@ -82,6 +83,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private float minimumRatio = 0.0f;
 
+	private boolean mirrorResults;
+
 	/**
 	 * Filter out Adamic-Adar scores less than the given minimum.
 	 *
@@ -110,6 +113,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
+	/**
+	 * By default only one result is output for each pair of vertices. When
+	 * mirroring a second result with the vertex order flipped is output for
+	 * each pair of vertices.
+	 *
+	 * @param mirrorResults whether output results should be mirrored
+	 * @return this
+	 */
+	public AdamicAdar<K, VV, EV> setMirrorResults(boolean mirrorResults) {
+		this.mirrorResults = mirrorResults;
+
+		return this;
+	}
+
 	@Override
 	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
 		if (!super.canMergeConfigurationWith(other)) {
@@ -194,7 +211,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
 		}
 
-		return scores;
+		if (mirrorResults) {
+			return scores
+				.flatMap(new MirrorResult<K, Result<K>>())
+					.name("Mirror results");
+		} else {
+			return scores;
+		}
 	}
 
 	/**
@@ -411,7 +434,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 */
 	public static class Result<T>
 	extends BinaryResultBase<T>
-	implements PrintableResult {
+	implements PrintableResult, Comparable<Result<T>> {
 		private FloatValue adamicAdarScore = new FloatValue();
 
 		/**
@@ -453,6 +476,11 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				+ "), adamic-adar score: " + adamicAdarScore;
 		}
 
+		@Override
+		public int compareTo(Result<T> o) {
+			return Float.compare(adamicAdarScore.getValue(), o.adamicAdarScore.getValue());
+		}
+
 		// ----------------------------------------------------------------------------------------
 
 		public static final int HASH_SEED = 0xe405f6d1;

http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
index b002956..e8f8659 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.types.NullValue;
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -42,7 +43,7 @@ public class TriangleListingTest
 extends AsmTestBase {
 
 	@Test
-	public void testSimpleGraph()
+	public void testSimpleGraphSorted()
 			throws Exception {
 		DataSet<Result<IntValue>> tl = directedSimpleGraph
 			.run(new TriangleListing<IntValue, NullValue, NullValue>()
@@ -56,6 +57,38 @@ extends AsmTestBase {
 	}
 
 	@Test
+	public void testSimpleGraphPermuted()
+			throws Exception {
+		DataSet<Result<IntValue>> tl = directedSimpleGraph
+			.run(new TriangleListing<IntValue, NullValue, NullValue>()
+				.setPermuteResults(true));
+
+		String expectedResult =
+			// permutation of (0,1,2,41)
+			"1st vertex ID: 0, 2nd vertex ID: 1, 3rd vertex ID: 2, edge directions: 0->1, 0->2, 1<-2\n" +
+			"1st vertex ID: 0, 2nd vertex ID: 2, 3rd vertex ID: 1, edge directions: 0->2, 0->1, 2->1\n" +
+			"1st vertex ID: 1, 2nd vertex ID: 0, 3rd vertex ID: 2, edge directions: 1<-0, 1<-2, 0->2\n" +
+			"1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 0, edge directions: 1<-2, 1<-0, 2<-0\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 0, 3rd vertex ID: 1, edge directions: 2<-0, 2->1, 0->1\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 0, edge directions: 2->1, 2<-0, 1<-0\n" +
+			// permutation of (1,2,3,22)
+			"1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 3, edge directions: 1<-2, 1<-3, 2->3\n" +
+			"1st vertex ID: 1, 2nd vertex ID: 3, 3rd vertex ID: 2, edge directions: 1<-3, 1<-2, 3<-2\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 3, edge directions: 2->1, 2->3, 1<-3\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 3, 3rd vertex ID: 1, edge directions: 2->3, 2->1, 3->1\n" +
+			"1st vertex ID: 3, 2nd vertex ID: 1, 3rd vertex ID: 2, edge directions: 3->1, 3<-2, 1<-2\n" +
+			"1st vertex ID: 3, 2nd vertex ID: 2, 3rd vertex ID: 1, edge directions: 3<-2, 3->1, 2->1";
+
+		List<String> printableStrings = new ArrayList<>();
+
+		for (Result<IntValue> result : tl.collect()) {
+			printableStrings.add(result.toPrintableString());
+		}
+
+		TestBaseUtils.compareResultAsText(printableStrings, expectedResult);
+	}
+
+	@Test
 	public void testCompleteGraph()
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
index ab99faa..6af1b01 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -31,6 +31,9 @@ import org.apache.flink.types.NullValue;
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -40,7 +43,7 @@ public class TriangleListingTest
 extends AsmTestBase {
 
 	@Test
-	public void testSimpleGraph()
+	public void testSimpleGraphSorted()
 			throws Exception {
 		DataSet<Result<IntValue>> tl = undirectedSimpleGraph
 			.run(new TriangleListing<IntValue, NullValue, NullValue>()
@@ -54,6 +57,38 @@ extends AsmTestBase {
 	}
 
 	@Test
+	public void testSimpleGraphPermuted()
+			throws Exception {
+		DataSet<Result<IntValue>> tl = undirectedSimpleGraph
+			.run(new TriangleListing<IntValue, NullValue, NullValue>()
+				.setPermuteResults(true));
+
+		String expectedResult =
+			// permutation of (0,1,2)
+			"1st vertex ID: 0, 2nd vertex ID: 1, 3rd vertex ID: 2\n" +
+			"1st vertex ID: 0, 2nd vertex ID: 2, 3rd vertex ID: 1\n" +
+			"1st vertex ID: 1, 2nd vertex ID: 0, 3rd vertex ID: 2\n" +
+			"1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 0\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 0, 3rd vertex ID: 1\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 0\n" +
+			// permutation of (1,2,3)
+			"1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 3\n" +
+			"1st vertex ID: 1, 2nd vertex ID: 3, 3rd vertex ID: 2\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 3\n" +
+			"1st vertex ID: 2, 2nd vertex ID: 3, 3rd vertex ID: 1\n" +
+			"1st vertex ID: 3, 2nd vertex ID: 1, 3rd vertex ID: 2\n" +
+			"1st vertex ID: 3, 2nd vertex ID: 2, 3rd vertex ID: 1";
+
+		List<String> printableStrings = new ArrayList<>();
+
+		for (Result<IntValue> result : tl.collect()) {
+			printableStrings.add(result.toPrintableString());
+		}
+
+		TestBaseUtils.compareResultAsText(printableStrings, expectedResult);
+	}
+
+	@Test
 	public void testCompleteGraph()
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;