You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/26 14:32:08 UTC
[3/5] flink git commit: [FLINK-6648] [gelly] Transforms for Gelly
examples
http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
index 9e26dfe..180c46f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
@@ -35,9 +35,25 @@ public abstract class TriangleListingBase<K, VV, EV, R>
extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
// Optional configuration
+ protected boolean permuteResults;
+
protected OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
/**
+ * By default only one result is output for each triangle, whether vertices
+ * are sorted or unsorted. When permutation is enabled a result is instead
+ * output for each of the six permutations of the three vertex IDs.
+ *
+ * @param permuteResults whether output results should be permuted
+ * @return this
+ */
+ public TriangleListingBase<K, VV, EV, R> setPermuteResults(boolean permuteResults) {
+ this.permuteResults = permuteResults;
+
+ return this;
+ }
+
+ /**
* Normalize the triangle listing such that for each result (K0, K1, K2)
* the vertex IDs are sorted K0 < K1 < K2.
*
@@ -51,6 +67,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
}
@Override
+ protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+ if (!super.canMergeConfigurationWith(other)) {
+ return false;
+ }
+
+ TriangleListingBase rhs = (TriangleListingBase) other;
+
+ return permuteResults == rhs.permuteResults;
+ }
+
+ @Override
protected final void mergeConfiguration(GraphAlgorithmWrappingBase other) {
super.mergeConfiguration(other);
http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index beddd24..00b2210 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -18,6 +18,7 @@
package org.apache.flink.graph.library.clustering.directed;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -121,7 +122,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
.with(new ProjectTriangles<K>())
.name("Triangle listing");
- if (sortTriangleVertices.get()) {
+ if (permuteResults) {
+ triangles = triangles
+ .flatMap(new PermuteResult<K>())
+ .name("Permute triangle vertices");
+ } else if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<K>())
.name("Sort triangle vertices");
@@ -309,6 +314,98 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
}
/**
+ * Output each input and an additional result for each of the five
+ * permutations of the three vertex IDs.
+ *
+ * @param <T> ID type
+ */
+ private static class PermuteResult<T>
+ implements FlatMapFunction<Result<T>, Result<T>> {
+ @Override
+ public void flatMap(Result<T> value, Collector<Result<T>> out)
+ throws Exception {
+ T tmp;
+
+ int f0f1, f0f2, f1f2;
+
+ byte bitmask = value.getBitmask().getValue();
+
+ // 0, 1, 2
+ out.collect(value);
+
+ tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
+ value.setVertexId1(tmp);
+
+ f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
+ f0f2 = (bitmask & 0b001100) >>> 2;
+ f1f2 = (bitmask & 0b000011) << 2;
+
+ bitmask = (byte) (f0f1 | f0f2 | f1f2);
+ value.setBitmask(bitmask);
+
+ // 1, 0, 2
+ out.collect(value);
+
+ tmp = value.getVertexId1();
+ value.setVertexId1(value.getVertexId2());
+ value.setVertexId2(tmp);
+
+ f0f1 = (bitmask & 0b110000) >>> 2;
+ f0f2 = (bitmask & 0b001100) << 2;
+ f1f2 = ((bitmask & 0b000010) >>> 1) | ((bitmask & 0b000001) << 1);
+
+ bitmask = (byte) (f0f1 | f0f2 | f1f2);
+ value.setBitmask(bitmask);
+
+ // 1, 2, 0
+ out.collect(value);
+
+ tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId2());
+ value.setVertexId2(tmp);
+
+ f0f1 = ((bitmask & 0b100000) >>> 5) | ((bitmask & 0b010000) >>> 3);
+ f0f2 = ((bitmask & 0b001000) >>> 1) | ((bitmask & 0b000100) << 1);
+ f1f2 = ((bitmask & 0b000010) << 3) | ((bitmask & 0b000001) << 5);
+
+ bitmask = (byte) (f0f1 | f0f2 | f1f2);
+ value.setBitmask(bitmask);
+
+ // 0, 2, 1
+ out.collect(value);
+
+ tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
+ value.setVertexId1(tmp);
+
+ f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
+ f0f2 = (bitmask & 0b001100) >>> 2;
+ f1f2 = (bitmask & 0b000011) << 2;
+
+ bitmask = (byte) (f0f1 | f0f2 | f1f2);
+ value.setBitmask(bitmask);
+
+ // 2, 0, 1
+ out.collect(value);
+
+ tmp = value.getVertexId1();
+ value.setVertexId1(value.getVertexId2());
+ value.setVertexId2(tmp);
+
+ f0f1 = (bitmask & 0b110000) >>> 2;
+ f0f2 = (bitmask & 0b001100) << 2;
+ f1f2 = ((bitmask & 0b000010) >>> 1) | ((bitmask & 0b000001) << 1);
+
+ bitmask = (byte) (f0f1 | f0f2 | f1f2);
+ value.setBitmask(bitmask);
+
+ // 2, 1, 0
+ out.collect(value);
+ }
+ }
+
+ /**
* Reorders the vertices of each emitted triangle (K0, K1, K2, bitmask)
* into sorted order such that K0 < K1 < K2.
*
@@ -384,7 +481,7 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
this.bitmask = bitmask;
}
- private void setBitmask(byte bitmask) {
+ public void setBitmask(byte bitmask) {
this.bitmask.setValue(bitmask);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index c714bed..2472744 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -113,7 +113,11 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
.with(new ProjectTriangles<K>())
.name("Triangle listing");
- if (sortTriangleVertices.get()) {
+ if (permuteResults) {
+ triangles = triangles
+ .flatMap(new PermuteResult<K>())
+ .name("Permute triangle vertices");
+ } else if (sortTriangleVertices.get()) {
triangles = triangles
.map(new SortTriangleVertices<K>())
.name("Sort triangle vertices");
@@ -250,6 +254,59 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
}
/**
+ * Output each input and an additional result for each of the five
+ * permutations of the three vertex IDs.
+ *
+ * @param <T> ID type
+ */
+ private static class PermuteResult<T>
+ implements FlatMapFunction<Result<T>, Result<T>> {
+ @Override
+ public void flatMap(Result<T> value, Collector<Result<T>> out)
+ throws Exception {
+ T tmp;
+
+ // 0, 1, 2
+ out.collect(value);
+
+ tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
+ value.setVertexId1(tmp);
+
+ // 1, 0, 2
+ out.collect(value);
+
+ tmp = value.getVertexId1();
+ value.setVertexId1(value.getVertexId2());
+ value.setVertexId2(tmp);
+
+ // 1, 2, 0
+ out.collect(value);
+
+ tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId2());
+ value.setVertexId2(tmp);
+
+ // 0, 2, 1
+ out.collect(value);
+
+ tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
+ value.setVertexId1(tmp);
+
+ // 2, 0, 1
+ out.collect(value);
+
+ tmp = value.getVertexId1();
+ value.setVertexId1(value.getVertexId2());
+ value.setVertexId2(tmp);
+
+ // 2, 1, 0
+ out.collect(value);
+ }
+ }
+
+ /**
* Reorders the vertices of each emitted triangle (K0, K1, K2)
* into sorted order such that K0 < K1 < K2.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 78dff0b..d761f60 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.asm.result.BinaryResult.MirrorResult;
import org.apache.flink.graph.asm.result.BinaryResultBase;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
@@ -82,6 +83,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
private float minimumRatio = 0.0f;
+ private boolean mirrorResults;
+
/**
* Filter out Adamic-Adar scores less than the given minimum.
*
@@ -110,6 +113,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
return this;
}
+ /**
+ * By default only one result is output for each pair of vertices. When
+ * mirroring a second result with the vertex order flipped is output for
+ * each pair of vertices.
+ *
+ * @param mirrorResults whether output results should be mirrored
+ * @return this
+ */
+ public AdamicAdar<K, VV, EV> setMirrorResults(boolean mirrorResults) {
+ this.mirrorResults = mirrorResults;
+
+ return this;
+ }
+
@Override
protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
if (!super.canMergeConfigurationWith(other)) {
@@ -194,7 +211,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
}
- return scores;
+ if (mirrorResults) {
+ return scores
+ .flatMap(new MirrorResult<K, Result<K>>())
+ .name("Mirror results");
+ } else {
+ return scores;
+ }
}
/**
@@ -411,7 +434,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
*/
public static class Result<T>
extends BinaryResultBase<T>
- implements PrintableResult {
+ implements PrintableResult, Comparable<Result<T>> {
private FloatValue adamicAdarScore = new FloatValue();
/**
@@ -453,6 +476,11 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+ "), adamic-adar score: " + adamicAdarScore;
}
+ @Override
+ public int compareTo(Result<T> o) {
+ return Float.compare(adamicAdarScore.getValue(), o.adamicAdarScore.getValue());
+ }
+
// ----------------------------------------------------------------------------------------
public static final int HASH_SEED = 0xe405f6d1;
http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
index b002956..e8f8659 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.types.NullValue;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -42,7 +43,7 @@ public class TriangleListingTest
extends AsmTestBase {
@Test
- public void testSimpleGraph()
+ public void testSimpleGraphSorted()
throws Exception {
DataSet<Result<IntValue>> tl = directedSimpleGraph
.run(new TriangleListing<IntValue, NullValue, NullValue>()
@@ -56,6 +57,38 @@ extends AsmTestBase {
}
@Test
+ public void testSimpleGraphPermuted()
+ throws Exception {
+ DataSet<Result<IntValue>> tl = directedSimpleGraph
+ .run(new TriangleListing<IntValue, NullValue, NullValue>()
+ .setPermuteResults(true));
+
+ String expectedResult =
+ // permutation of (0,1,2,41)
+ "1st vertex ID: 0, 2nd vertex ID: 1, 3rd vertex ID: 2, edge directions: 0->1, 0->2, 1<-2\n" +
+ "1st vertex ID: 0, 2nd vertex ID: 2, 3rd vertex ID: 1, edge directions: 0->2, 0->1, 2->1\n" +
+ "1st vertex ID: 1, 2nd vertex ID: 0, 3rd vertex ID: 2, edge directions: 1<-0, 1<-2, 0->2\n" +
+ "1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 0, edge directions: 1<-2, 1<-0, 2<-0\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 0, 3rd vertex ID: 1, edge directions: 2<-0, 2->1, 0->1\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 0, edge directions: 2->1, 2<-0, 1<-0\n" +
+ // permutation of (1,2,3,22)
+ "1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 3, edge directions: 1<-2, 1<-3, 2->3\n" +
+ "1st vertex ID: 1, 2nd vertex ID: 3, 3rd vertex ID: 2, edge directions: 1<-3, 1<-2, 3<-2\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 3, edge directions: 2->1, 2->3, 1<-3\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 3, 3rd vertex ID: 1, edge directions: 2->3, 2->1, 3->1\n" +
+ "1st vertex ID: 3, 2nd vertex ID: 1, 3rd vertex ID: 2, edge directions: 3->1, 3<-2, 1<-2\n" +
+ "1st vertex ID: 3, 2nd vertex ID: 2, 3rd vertex ID: 1, edge directions: 3<-2, 3->1, 2->1";
+
+ List<String> printableStrings = new ArrayList<>();
+
+ for (Result<IntValue> result : tl.collect()) {
+ printableStrings.add(result.toPrintableString());
+ }
+
+ TestBaseUtils.compareResultAsText(printableStrings, expectedResult);
+ }
+
+ @Test
public void testCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/8695a210/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
index ab99faa..6af1b01 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -31,6 +31,9 @@ import org.apache.flink.types.NullValue;
import org.apache.commons.math3.util.CombinatoricsUtils;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
/**
@@ -40,7 +43,7 @@ public class TriangleListingTest
extends AsmTestBase {
@Test
- public void testSimpleGraph()
+ public void testSimpleGraphSorted()
throws Exception {
DataSet<Result<IntValue>> tl = undirectedSimpleGraph
.run(new TriangleListing<IntValue, NullValue, NullValue>()
@@ -54,6 +57,38 @@ extends AsmTestBase {
}
@Test
+ public void testSimpleGraphPermuted()
+ throws Exception {
+ DataSet<Result<IntValue>> tl = undirectedSimpleGraph
+ .run(new TriangleListing<IntValue, NullValue, NullValue>()
+ .setPermuteResults(true));
+
+ String expectedResult =
+ // permutation of (0,1,2)
+ "1st vertex ID: 0, 2nd vertex ID: 1, 3rd vertex ID: 2\n" +
+ "1st vertex ID: 0, 2nd vertex ID: 2, 3rd vertex ID: 1\n" +
+ "1st vertex ID: 1, 2nd vertex ID: 0, 3rd vertex ID: 2\n" +
+ "1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 0\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 0, 3rd vertex ID: 1\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 0\n" +
+ // permutation of (1,2,3)
+ "1st vertex ID: 1, 2nd vertex ID: 2, 3rd vertex ID: 3\n" +
+ "1st vertex ID: 1, 2nd vertex ID: 3, 3rd vertex ID: 2\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 1, 3rd vertex ID: 3\n" +
+ "1st vertex ID: 2, 2nd vertex ID: 3, 3rd vertex ID: 1\n" +
+ "1st vertex ID: 3, 2nd vertex ID: 1, 3rd vertex ID: 2\n" +
+ "1st vertex ID: 3, 2nd vertex ID: 2, 3rd vertex ID: 1";
+
+ List<String> printableStrings = new ArrayList<>();
+
+ for (Result<IntValue> result : tl.collect()) {
+ printableStrings.add(result.toPrintableString());
+ }
+
+ TestBaseUtils.compareResultAsText(printableStrings, expectedResult);
+ }
+
+ @Test
public void testCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;