You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:34 UTC

[27/50] [abbrv] flink git commit: [FLINK-6382] [gelly] Support additional types for generated graphs in Gelly examples

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
index a5ea486..8c5ed86 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -28,8 +28,15 @@ import org.junit.runners.Parameterized;
 public class GraphMetricsITCase
 extends DriverBaseITCase {
 
-	public GraphMetricsITCase(TestExecutionMode mode) {
-		super(mode);
+	public GraphMetricsITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String order, String output) {
+		return new String[] {
+			"--algorithm", "GraphMetrics", "--order", order,
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", order,
+			"--output", output};
 	}
 
 	@Test
@@ -43,58 +50,110 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testWithDirectedRMatIntegerGraph() throws Exception {
+	public void testWithSmallDirectedRMatIntegerGraph() throws Exception {
 		String expected = "\n" +
 			"Vertex metrics:\n" +
-			"  vertex count: 902\n" +
-			"  edge count: 12,009\n" +
-			"  unidirectional edge count: 8,875\n" +
-			"  bidirectional edge count: 1,567\n" +
-			"  average degree: 13.314\n" +
-			"  density: 0.01477663\n" +
-			"  triplet count: 1,003,442\n" +
-			"  maximum degree: 463\n" +
-			"  maximum out degree: 334\n" +
-			"  maximum in degree: 342\n" +
-			"  maximum triplets: 106,953\n" +
+			"  vertex count: 117\n" +
+			"  edge count: 1,168\n" +
+			"  unidirectional edge count: 686\n" +
+			"  bidirectional edge count: 241\n" +
+			"  average degree: 9.983\n" +
+			"  density: 0.08605953\n" +
+			"  triplet count: 29,286\n" +
+			"  maximum degree: 91\n" +
+			"  maximum out degree: 77\n" +
+			"  maximum in degree: 68\n" +
+			"  maximum triplets: 4,095\n" +
 			"\n" +
 			"Edge metrics:\n" +
-			"  triangle triplet count: 107,817\n" +
-			"  rectangle triplet count: 315,537\n" +
-			"  maximum triangle triplets: 820\n" +
-			"  maximum rectangle triplets: 3,822\n";
+			"  triangle triplet count: 4,575\n" +
+			"  rectangle triplet count: 11,756\n" +
+			"  maximum triangle triplets: 153\n" +
+			"  maximum rectangle triplets: 391\n";
+
+		expectedOutput(parameters(7, "directed", "hash"), expected);
+		expectedOutput(parameters(7, "directed", "print"), expected);
+	}
+
+	@Test
+	public void testWithLargeDirectedRMatIntegerGraph() throws Exception {
+		// skip 'byte' which cannot store vertex IDs for scale > 8
+		Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
+
+		// skip 'string' which does not compare numerically and generates a different triangle count
+		Assume.assumeFalse(idType.equals("string") || idType.equals("nativeString"));
 
-		String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "directed",
-			"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-			"--output"};
+		String expected = "\n" +
+			"Vertex metrics:\n" +
+			"  vertex count: 3,349\n" +
+			"  edge count: 53,368\n" +
+			"  unidirectional edge count: 43,602\n" +
+			"  bidirectional edge count: 4,883\n" +
+			"  average degree: 15.936\n" +
+			"  density: 0.00475971\n" +
+			"  triplet count: 9,276,207\n" +
+			"  maximum degree: 1,356\n" +
+			"  maximum out degree: 921\n" +
+			"  maximum in degree: 966\n" +
+			"  maximum triplets: 918,690\n" +
+			"\n" +
+			"Edge metrics:\n" +
+			"  triangle triplet count: 779,202\n" +
+			"  rectangle triplet count: 2,506,371\n" +
+			"  maximum triangle triplets: 3,160\n" +
+			"  maximum rectangle triplets: 16,835\n";
 
-		expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
-		expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+		expectedOutput(parameters(12, "directed", "hash"), expected);
+		expectedOutput(parameters(12, "directed", "print"), expected);
 	}
 
 	@Test
-	public void testWithUndirectedRMatIntegerGraph() throws Exception {
+	public void testWithSmallUndirectedRMatIntegerGraph() throws Exception {
 		String expected = "\n" +
 			"Vertex metrics:\n" +
-			"  vertex count: 902\n" +
-			"  edge count: 10,442\n" +
-			"  average degree: 23.153\n" +
-			"  density: 0.025697\n" +
-			"  triplet count: 1,003,442\n" +
-			"  maximum degree: 463\n" +
-			"  maximum triplets: 106,953\n" +
+			"  vertex count: 117\n" +
+			"  edge count: 927\n" +
+			"  average degree: 15.846\n" +
+			"  density: 0.13660477\n" +
+			"  triplet count: 29,286\n" +
+			"  maximum degree: 91\n" +
+			"  maximum triplets: 4,095\n" +
 			"\n" +
 			"Edge metrics:\n" +
-			"  triangle triplet count: 107,817\n" +
-			"  rectangle triplet count: 315,537\n" +
-			"  maximum triangle triplets: 820\n" +
-			"  maximum rectangle triplets: 3,822\n";
+			"  triangle triplet count: 4,575\n" +
+			"  rectangle triplet count: 11,756\n" +
+			"  maximum triangle triplets: 153\n" +
+			"  maximum rectangle triplets: 391\n";
+
+		expectedOutput(parameters(7, "undirected", "hash"), expected);
+		expectedOutput(parameters(7, "undirected", "print"), expected);
+	}
+
+	@Test
+	public void testWithLargelUndirectedRMatIntegerGraph() throws Exception {
+		// skip 'byte' which cannot store vertex IDs for scale > 8
+		Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
 
-		String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "undirected",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output"};
+		// skip 'string' which does not compare numerically and generates a different triangle count
+		Assume.assumeFalse(idType.equals("string") || idType.equals("nativeString"));
+
+		String expected = "\n" +
+			"Vertex metrics:\n" +
+			"  vertex count: 3,349\n" +
+			"  edge count: 48,485\n" +
+			"  average degree: 28.955\n" +
+			"  density: 0.00864842\n" +
+			"  triplet count: 9,276,207\n" +
+			"  maximum degree: 1,356\n" +
+			"  maximum triplets: 918,690\n" +
+			"\n" +
+			"Edge metrics:\n" +
+			"  triangle triplet count: 779,202\n" +
+			"  rectangle triplet count: 2,506,371\n" +
+			"  maximum triangle triplets: 3,160\n" +
+			"  maximum rectangle triplets: 16,835\n";
 
-		expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
-		expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+		expectedOutput(parameters(12, "undirected", "hash"), expected);
+		expectedOutput(parameters(12, "undirected", "print"), expected);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
index 5474d1b..282d3d5 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -27,8 +28,15 @@ import org.junit.runners.Parameterized;
 public class HITSITCase
 extends DriverBaseITCase {
 
-	public HITSITCase(TestExecutionMode mode) {
-		super(mode);
+	public HITSITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String output) {
+		return new String[] {
+			"--algorithm", "HITS",
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "directed",
+			"--output", output};
 	}
 
 	@Test
@@ -42,11 +50,21 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "HITS",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "print"},
-			902);
+	public void testPrintWithSmallRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedCount(parameters(8, "print"), 233);
+	}
+
+	@Test
+	public void testPrintWithLargeRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		// skip 'byte' which cannot store vertex IDs for scale > 8
+		Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
+
+		expectedCount(parameters(12, "print"), 3349);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
index 0632856..0391771 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
@@ -18,17 +18,29 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class JaccardIndexITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
 
-	public JaccardIndexITCase(TestExecutionMode mode) {
-		super(mode);
+	public JaccardIndexITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String output, String... additionalParameters) {
+		String[] parameters = new String[] {
+			"--algorithm", "JaccardIndex", "--mirror_results",
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "undirected",
+			"--output", output};
+
+		return ArrayUtils.addAll(parameters, additionalParameters);
 	}
 
 	@Test
@@ -42,22 +54,67 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testHashWithRMatIntegerGraph() throws Exception {
-		String expected = "\nChecksumHashCode 0x0001b188570b2572, count 221628\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "JaccardIndex",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "hash"},
-			expected);
+	public void testHashWithSmallRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x0000164757052eebL;
+				break;
+
+			case "long":
+				checksum = 0x000016337a6a7270L;
+				break;
+
+			case "string":
+				checksum = 0x00001622a522a290L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(parameters(7, "hash"), 11388, checksum);
 	}
 
 	@Test
-	public void testPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "JaccardIndex",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "print"},
-			221628);
+	public void testHashWithLargeRMatGraph() throws Exception {
+		// computation is too large for collection mode
+		Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+				return;
+
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x0021ce158d811c4eL;
+				break;
+
+			case "long":
+				checksum = 0x0021d20fb3904720L;
+				break;
+
+			case "string":
+				checksum = 0x0021cd8fafec1524L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(parameters(12, "hash"), 4432058, checksum);
+	}
+
+	@Test
+	public void testPrintWithSmallRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(parameters(7, "print"), new Checksum(11388, 0x0000163b17088256L));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
index d7301d0..4ca0a85 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.drivers;
 
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -27,8 +28,15 @@ import org.junit.runners.Parameterized;
 public class PageRankITCase
 extends DriverBaseITCase {
 
-	public PageRankITCase(TestExecutionMode mode) {
-		super(mode);
+	public PageRankITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String output) {
+		return new String[] {
+			"--algorithm", "PageRank",
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "directed",
+			"--output", output};
 	}
 
 	@Test
@@ -42,11 +50,21 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "PageRank",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "print"},
-			902);
+	public void testPrintWithSmallRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedCount(parameters(8, "print"), 233);
+	}
+
+	@Test
+	public void testPrintWithLargeRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		// skip 'byte' which cannot store vertex IDs for scale > 8
+		Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
+
+		expectedCount(parameters(12, "print"), 3349);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
index 0d2897c..fabdae1 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -18,17 +18,33 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TriangleListingITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
 
-	public TriangleListingITCase(TestExecutionMode mode) {
-		super(mode);
+	public TriangleListingITCase(String idType, TestExecutionMode mode) {
+		super(idType, mode);
+	}
+
+	private String[] parameters(int scale, String order, String output) {
+		String[] parameters =  new String[] {
+			"--algorithm", "TriangleListing", "--order", order,
+			"--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", order,
+			"--output", output};
+
+		if (output.equals("hash")) {
+			return ArrayUtils.addAll(parameters, "--sort_triangle_vertices", "--triadic_census");
+		} else {
+			return parameters;
+		}
 	}
 
 	@Test
@@ -42,66 +58,224 @@ extends DriverBaseITCase {
 	}
 
 	@Test
-	public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+	public void testHashWithSmallDirectedRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x000000003d2f0a9aL;
+				break;
+
+			case "long":
+				checksum = 0x000000016aba3720L;
+				break;
+
+			case "string":
+				checksum = 0x0000005bfef84facL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
 		String expected = "\n" +
-			"ChecksumHashCode 0x0000001beffe6edd, count 75049\n" +
+			new Checksum(3822, checksum) + "\n" +
 			"Triadic census:\n" +
-			"  003: 113,435,893\n" +
-			"  012: 6,632,528\n" +
-			"  102: 983,535\n" +
-			"  021d: 118,574\n" +
-			"  021u: 118,566\n" +
-			"  021c: 237,767\n" +
-			"  111d: 129,773\n" +
-			"  111u: 130,041\n" +
-			"  030t: 16,981\n" +
-			"  030c: 5,535\n" +
-			"  201: 43,574\n" +
-			"  120d: 7,449\n" +
-			"  120u: 7,587\n" +
-			"  120c: 15,178\n" +
-			"  210: 17,368\n" +
-			"  300: 4,951\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "TriangleListing", "--order", "directed", "--sort_triangle_vertices", "--triadic_census",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "hash"},
-			expected);
+			"  003: 178,989\n" +
+			"  012: 47,736\n" +
+			"  102: 11,763\n" +
+			"  021d: 2,258\n" +
+			"  021u: 2,064\n" +
+			"  021c: 4,426\n" +
+			"  111d: 3,359\n" +
+			"  111u: 3,747\n" +
+			"  030t: 624\n" +
+			"  030c: 220\n" +
+			"  201: 1,966\n" +
+			"  120d: 352\n" +
+			"  120u: 394\n" +
+			"  120c: 704\n" +
+			"  210: 1,120\n" +
+			"  300: 408\n";
+
+		expectedOutput(parameters(7, "directed", "hash"), expected);
 	}
 
 	@Test
-	public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "TriangleListing", "--order", "directed",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
-				"--output", "print"},
-			75049);
+	public void testHashWithSmallUndirectedRMatGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x0000000001f92b0cL;
+				break;
+
+			case "long":
+				checksum = 0x000000000bb355c6L;
+				break;
+
+			case "string":
+				checksum = 0x00000002f7b5576aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		String expected = "\n" +
+			new Checksum(3822, checksum) + "\n" +
+			"Triadic census:\n" +
+			"  03: 178,989\n" +
+			"  12: 59,499\n" +
+			"  21: 17,820\n" +
+			"  30: 3,822\n";
+
+		expectedOutput(parameters(7, "undirected", "hash"), expected);
 	}
 
 	@Test
-	public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+	public void testHashWithLargeDirectedRMatGraph() throws Exception {
+		// computation is too large for collection mode
+		Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+				return;
+
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x00000248fef26209L;
+				break;
+
+			case "long":
+				checksum = 0x000002dcdf0fbb1bL;
+				break;
+
+			case "string":
+				checksum = 0x00035b760ab9da74L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
 		String expected = "\n" +
-			"ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" +
+			new Checksum(479818, checksum) + "\n" +
 			"Triadic census:\n" +
-			"  03: 113,435,893\n" +
-			"  12: 7,616,063\n" +
-			"  21: 778,295\n" +
-			"  30: 75,049\n";
-
-		expectedOutput(
-			new String[]{"--algorithm", "TriangleListing", "--order", "undirected", "--sort_triangle_vertices", "--triadic_census",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "hash"},
-			expected);
+			"  003: 6,101,196,568\n" +
+			"  012: 132,051,207\n" +
+			"  102: 13,115,128\n" +
+			"  021d: 1,330,423\n" +
+			"  021u: 1,336,897\n" +
+			"  021c: 2,669,285\n" +
+			"  111d: 1,112,144\n" +
+			"  111u: 1,097,452\n" +
+			"  030t: 132,048\n" +
+			"  030c: 44,127\n" +
+			"  201: 290,552\n" +
+			"  120d: 47,734\n" +
+			"  120u: 47,780\n" +
+			"  120c: 95,855\n" +
+			"  210: 90,618\n" +
+			"  300: 21,656\n";
+
+		expectedOutput(parameters(12, "directed", "hash"), expected);
 	}
 
 	@Test
-	public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
-		expectedCount(
-			new String[]{"--algorithm", "TriangleListing", "--order", "undirected",
-				"--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
-				"--output", "print"},
-			75049);
+	public void testHashWithLargeUndirectedRMatGraph() throws Exception {
+		// computation is too large for collection mode
+		Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+				return;
+
+			case "short":
+			case "char":
+			case "integer":
+				checksum = 0x00000012dee4bf2cL;
+				break;
+
+			case "long":
+				checksum = 0x00000017a40efbdaL;
+				break;
+
+			case "string":
+				checksum = 0x000159e8be3e370bL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		String expected = "\n" +
+			new Checksum(479818, checksum) + "\n" +
+			"Triadic census:\n" +
+			"  03: 6,101,196,568\n" +
+			"  12: 145,166,335\n" +
+			"  21: 7,836,753\n" +
+			"  30: 479,818\n";
+
+		expectedOutput(parameters(12, "undirected", "hash"), expected);
+	}
+
+
+	@Test
+	public void testPrintWithSmallDirectedRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "integer":
+			case "long":
+				checksum = 0x00000764227995aaL;
+				break;
+
+			case "string":
+				checksum = 0x000007643d93c30aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedOutputChecksum(parameters(7, "directed", "print"), new Checksum(3822, checksum));
+	}
+
+
+	@Test
+	public void testPrintWithSmallUndirectedRMatGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "short":
+			case "integer":
+			case "long":
+				checksum = 0x0000077d1582e206L;
+				break;
+
+			case "string":
+				checksum = 0x0000077a49cb5268L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedOutputChecksum(parameters(7, "undirected", "print"), new Checksum(3822, checksum));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
new file mode 100644
index 0000000..da77b0d
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToChar;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToCharValue;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToString;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedByte;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedByteValue;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedInt;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToLong;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedShort;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedShortValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GeneratedGraphTest {
+
+	private TranslateFunction<LongValue, ByteValue> byteValueTranslator = new LongValueToUnsignedByteValue();
+	private TranslateFunction<LongValue, Byte> byteTranslator = new LongValueToUnsignedByte();
+	private TranslateFunction<LongValue, ShortValue> shortValueTranslator = new LongValueToUnsignedShortValue();
+	private TranslateFunction<LongValue, Short> shortTranslator = new LongValueToUnsignedShort();
+	private TranslateFunction<LongValue, CharValue> charValueTranslator = new LongValueToCharValue();
+	private TranslateFunction<LongValue, Character> charTranslator = new LongValueToChar();
+	private TranslateFunction<LongValue, Integer> intTranslator = new LongValueToUnsignedInt();
+	private TranslateFunction<LongValue, Long> longTranslator = new LongValueToLong();
+	private TranslateFunction<LongValue, String> stringTranslator = new LongValueToString();
+
+	private ByteValue byteValue = new ByteValue();
+	private ShortValue shortValue = new ShortValue();
+	private CharValue charValue = new CharValue();
+
+	// ByteValue
+
+	@Test
+	public void testByteValueTranslation() throws Exception {
+		assertEquals(new ByteValue((byte) 0), byteValueTranslator.translate(new LongValue(0L), byteValue));
+		assertEquals(new ByteValue(Byte.MIN_VALUE), byteValueTranslator.translate(new LongValue((long) Byte.MAX_VALUE + 1), byteValue));
+		assertEquals(new ByteValue((byte) -1), byteValueTranslator.translate(new LongValue((1L << 8) - 1), byteValue));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testByteValueTranslationUpperOutOfRange() throws Exception {
+		byteValueTranslator.translate(new LongValue(1L << 8), byteValue);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testByteValueTranslationLowerOutOfRange() throws Exception {
+		byteValueTranslator.translate(new LongValue(-1), byteValue);
+	}
+
+	// Byte
+
+	@Test
+	public void testByteTranslation() throws Exception {
+		assertEquals(Byte.valueOf((byte) 0), byteTranslator.translate(new LongValue(0L), null));
+		assertEquals(Byte.valueOf(Byte.MIN_VALUE), byteTranslator.translate(new LongValue((long) Byte.MAX_VALUE + 1), null));
+		assertEquals(Byte.valueOf((byte) -1), byteTranslator.translate(new LongValue((1L << 8) - 1), null));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testByteTranslationUpperOutOfRange() throws Exception {
+		byteTranslator.translate(new LongValue(1L << 8), null);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testByteTranslationLowerOutOfRange() throws Exception {
+		byteTranslator.translate(new LongValue(-1), null);
+	}
+
+	// ShortValue
+
+	@Test
+	public void testShortValueTranslation() throws Exception {
+		assertEquals(new ShortValue((short) 0), shortValueTranslator.translate(new LongValue(0L), shortValue));
+		assertEquals(new ShortValue(Short.MIN_VALUE), shortValueTranslator.translate(new LongValue((long) Short.MAX_VALUE + 1), shortValue));
+		assertEquals(new ShortValue((short) -1), shortValueTranslator.translate(new LongValue((1L << 16) - 1), shortValue));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testShortValueTranslationUpperOutOfRange() throws Exception {
+		shortValueTranslator.translate(new LongValue(1L << 16), shortValue);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testShortValueTranslationLowerOutOfRange() throws Exception {
+		shortValueTranslator.translate(new LongValue(-1), shortValue);
+	}
+
+	// Short
+
+	@Test
+	public void testShortTranslation() throws Exception {
+		assertEquals(Short.valueOf((short) 0), shortTranslator.translate(new LongValue(0L), null));
+		assertEquals(Short.valueOf(Short.MIN_VALUE), shortTranslator.translate(new LongValue((long) Short.MAX_VALUE + 1), null));
+		assertEquals(Short.valueOf((short) -1), shortTranslator.translate(new LongValue((1L << 16) - 1), null));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testShortTranslationUpperOutOfRange() throws Exception {
+		shortTranslator.translate(new LongValue(1L << 16), null);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testShortTranslationLowerOutOfRange() throws Exception {
+		shortTranslator.translate(new LongValue(-1), null);
+	}
+
+	// CharValue
+
+	@Test
+	public void testCharValueTranslation() throws Exception {
+		assertEquals(new CharValue((char) 0), charValueTranslator.translate(new LongValue(0L), charValue));
+		assertEquals(new CharValue(Character.MAX_VALUE), charValueTranslator.translate(new LongValue((long) Character.MAX_VALUE), charValue));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testCharValueTranslationUpperOutOfRange() throws Exception {
+		charValueTranslator.translate(new LongValue(1L << 16), charValue);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testCharValueTranslationLowerOutOfRange() throws Exception {
+		charValueTranslator.translate(new LongValue(-1), charValue);
+	}
+
+	// Character
+
+	@Test
+	public void testCharacterTranslation() throws Exception {
+		assertEquals(Character.valueOf((char) 0), charTranslator.translate(new LongValue(0L), null));
+		assertEquals(Character.valueOf(Character.MAX_VALUE), charTranslator.translate(new LongValue((long) Character.MAX_VALUE), null));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testCharacterTranslationUpperOutOfRange() throws Exception {
+		charTranslator.translate(new LongValue(1L << 16), null);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testCharacterTranslationLowerOutOfRange() throws Exception {
+		charTranslator.translate(new LongValue(-1), null);
+	}
+
+	// Integer
+
+	@Test
+	public void testIntegerTranslation() throws Exception {
+		assertEquals(Integer.valueOf(0), intTranslator.translate(new LongValue(0L), null));
+		assertEquals(Integer.valueOf(Integer.MIN_VALUE), intTranslator.translate(new LongValue((long) Integer.MAX_VALUE + 1), null));
+		assertEquals(Integer.valueOf(-1), intTranslator.translate(new LongValue((1L << 32) - 1), null));
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testIntegerTranslationUpperOutOfRange() throws Exception {
+		intTranslator.translate(new LongValue(1L << 32), null);
+	}
+
+	@Test(expected=IllegalArgumentException.class)
+	public void testIntegerTranslationLowerOutOfRange() throws Exception {
+		intTranslator.translate(new LongValue(-1), null);
+	}
+
+	// Long
+
+	@Test
+	public void testLongTranslation() throws Exception {
+		assertEquals(Long.valueOf(0L), longTranslator.translate(new LongValue(0L), null));
+		assertEquals(Long.valueOf(Long.MIN_VALUE), longTranslator.translate(new LongValue(Long.MIN_VALUE), null));
+		assertEquals(Long.valueOf(Long.MAX_VALUE), longTranslator.translate(new LongValue(Long.MAX_VALUE), null));
+	}
+
+	// String
+
+	@Test
+	public void testStringTranslation() throws Exception {
+		assertEquals("0", stringTranslator.translate(new LongValue(0L), null));
+		assertEquals("-9223372036854775808", stringTranslator.translate(new LongValue(Long.MIN_VALUE), null));
+		assertEquals("9223372036854775807", stringTranslator.translate(new LongValue(Long.MAX_VALUE), null));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
index 038a2e4..30a74df 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.MathUtils;
 public class LongValueToSignedIntValue
 implements TranslateFunction<LongValue, IntValue> {
 
+	public static final long MAX_VERTEX_COUNT = 1L << 32;
+
 	@Override
 	public IntValue translate(LongValue value, IntValue reuse)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
index 8fe665c..741bd62 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue;
 public class LongValueToUnsignedIntValue
 implements TranslateFunction<LongValue, IntValue> {
 
+	public static final long MAX_VERTEX_COUNT = 1L << 32;
+
 	@Override
 	public IntValue translate(LongValue value, IntValue reuse)
 			throws Exception {
@@ -39,10 +41,10 @@ implements TranslateFunction<LongValue, IntValue> {
 
 		long l = value.getValue();
 
-		if (l < 0 || l >= (1L << 32)) {
+		if (l < 0 || l >= MAX_VERTEX_COUNT) {
 			throw new IllegalArgumentException("Cannot cast long value " + value + " to integer.");
 		} else {
-			reuse.setValue((int)(l & 0xffffffffL));
+			reuse.setValue((int) (l & (MAX_VERTEX_COUNT - 1)));
 		}
 
 		return reuse;

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index dfa7eb2..84cb791 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -80,17 +80,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@ForwardedFields("*->f0")
-	public class LinkVertexToAll
+	private static class LinkVertexToAll
 	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
-
 		private final long vertexCount;
 
 		private LongValue target = new LongValue();
 
 		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
 
-		public LinkVertexToAll(long vertex_count) {
-			this.vertexCount = vertex_count;
+		public LinkVertexToAll(long vertexCount) {
+			this.vertexCount = vertexCount;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 0ca804e..16a9ab9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -107,7 +107,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@ForwardedFields("*->f0")
-	public class LinkVertexToNeighbors
+	private static class LinkVertexToNeighbors
 	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
 		private long vertexCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 95a4f85..0fa4127 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -166,7 +166,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		return Graph.fromDataSet(vertices, edges, env);
 	}
 
-	private static final class GenerateEdges<T extends RandomGenerator>
+	private static class GenerateEdges<T extends RandomGenerator>
 	implements FlatMapFunction<BlockInfo<T>, Edge<LongValue, NullValue>> {
 
 		// Configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 6c7c433..316d749 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -80,7 +80,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@ForwardedFields("*->f0")
-	public class LinkVertexToCenter
+	private static class LinkVertexToCenter
 	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
 		private LongValue center = new LongValue(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 4857add..ba1ab21 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -60,8 +60,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * <p>
  * http://www.cs.cornell.edu/home/kleinber/auth.pdf
  *
- * http://www.cs.cornell.edu/home/kleinber/auth.pdf
- *
  * @param <K> graph ID type
  * @param <VV> vertex value type
  * @param <EV> edge value type

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index bc3cb86..b3e69f1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -80,6 +80,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private int maximumScoreDenominator = 1;
 
+	private boolean mirrorResults;
+
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
 	/**
@@ -141,6 +143,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	/**
+	 * By default only one result is output for each pair of vertices. When
+	 * mirroring a second result with the vertex order flipped is output for
+	 * each pair of vertices.
+	 *
+	 * @param mirrorResults whether output results should be mirrored
+	 * @return this
+	 */
+	public JaccardIndex<K, VV, EV> setMirrorResults(boolean mirrorResults) {
+		this.mirrorResults = mirrorResults;
+
+		return this;
+	}
+
+	/**
 	 * Override the parallelism of operators processing small amounts of data.
 	 *
 	 * @param littleParallelism operator parallelism
@@ -176,7 +192,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			minimumScoreNumerator != rhs.minimumScoreNumerator ||
 			minimumScoreDenominator != rhs.minimumScoreDenominator ||
 			maximumScoreNumerator != rhs.maximumScoreNumerator ||
-			maximumScoreDenominator != rhs.maximumScoreDenominator) {
+			maximumScoreDenominator != rhs.maximumScoreDenominator ||
+			mirrorResults != rhs.mirrorResults) {
 			return false;
 		}
 
@@ -230,12 +247,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Generate group pairs");
 
 		// t, u, intersection, union
-		return twoPaths
+		DataSet<Result<K>> scores = twoPaths
 			.groupBy(0, 1)
 			.reduceGroup(new ComputeScores<K>(unboundedScores,
 					minimumScoreNumerator, minimumScoreDenominator,
 					maximumScoreNumerator, maximumScoreDenominator))
 				.name("Compute scores");
+
+		if (mirrorResults) {
+			scores = scores
+				.flatMap(new MirrorResult<K, Result<K>>())
+					.name("Mirror results");
+		}
+
+		return scores;
 	}
 
 	/**
@@ -451,6 +476,27 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	/**
+	 * Output each input and a second result with the vertex order flipped.
+	 *
+	 * @param <T> ID type
+	 * @param <RT> result type
+	 */
+	private static class MirrorResult<T, RT extends BinaryResult<T>>
+	implements FlatMapFunction<RT, RT> {
+		@Override
+		public void flatMap(RT value, Collector<RT> out)
+				throws Exception {
+			out.collect(value);
+
+			T tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId1());
+			value.setVertexId1(tmp);
+
+			out.collect(value);
+		}
+	}
+
+	/**
 	 * Wraps the vertex type to encapsulate results from the jaccard index algorithm.
 	 *
 	 * @param <T> ID type

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 1811038..469a23f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -60,7 +60,7 @@ public class AsmTestBase {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 		env.getConfig().enableObjectReuse();
 
-		// the "fish" graph
+		// a "fish" graph
 		Object[][] edges = new Object[][]{
 			new Object[]{0, 1},
 			new Object[]{0, 2},

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
index 976cc34..f730adf 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
@@ -40,11 +40,11 @@ public class LongValueToSignedIntValueTest {
 
 	@Test(expected=IllegalArgumentException.class)
 	public void testUpperOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+		translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse);
 	}
 
 	@Test(expected=IllegalArgumentException.class)
 	public void testLowerOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse));
+		translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
index 6a2ae83..ca70162 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
@@ -34,17 +34,17 @@ public class LongValueToUnsignedIntValueTest {
 	@Test
 	public void testTranslation() throws Exception {
 		assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
-		assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+		assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long) Integer.MAX_VALUE + 1), reuse));
 		assertEquals(new IntValue(-1), translator.translate(new LongValue((1L << 32) - 1), reuse));
 	}
 
 	@Test(expected=IllegalArgumentException.class)
 	public void testUpperOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue(1L << 32), reuse));
+		translator.translate(new LongValue(1L << 32), reuse);
 	}
 
 	@Test(expected=IllegalArgumentException.class)
 	public void testLowerOutOfRange() throws Exception {
-		assertEquals(new IntValue(), translator.translate(new LongValue(-1), reuse));
+		translator.translate(new LongValue(-1), reuse);
 	}
 }