You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:34 UTC
[27/50] [abbrv] flink git commit: [FLINK-6382] [gelly] Support
additional types for generated graphs in Gelly examples
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
index a5ea486..8c5ed86 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/GraphMetricsITCase.java
@@ -18,8 +18,8 @@
package org.apache.flink.graph.drivers;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -28,8 +28,15 @@ import org.junit.runners.Parameterized;
public class GraphMetricsITCase
extends DriverBaseITCase {
- public GraphMetricsITCase(TestExecutionMode mode) {
- super(mode);
+ public GraphMetricsITCase(String idType, TestExecutionMode mode) {
+ super(idType, mode);
+ }
+
+ private String[] parameters(int scale, String order, String output) {
+ return new String[] {
+ "--algorithm", "GraphMetrics", "--order", order,
+ "--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", order,
+ "--output", output};
}
@Test
@@ -43,58 +50,110 @@ extends DriverBaseITCase {
}
@Test
- public void testWithDirectedRMatIntegerGraph() throws Exception {
+ public void testWithSmallDirectedRMatIntegerGraph() throws Exception {
String expected = "\n" +
"Vertex metrics:\n" +
- " vertex count: 902\n" +
- " edge count: 12,009\n" +
- " unidirectional edge count: 8,875\n" +
- " bidirectional edge count: 1,567\n" +
- " average degree: 13.314\n" +
- " density: 0.01477663\n" +
- " triplet count: 1,003,442\n" +
- " maximum degree: 463\n" +
- " maximum out degree: 334\n" +
- " maximum in degree: 342\n" +
- " maximum triplets: 106,953\n" +
+ " vertex count: 117\n" +
+ " edge count: 1,168\n" +
+ " unidirectional edge count: 686\n" +
+ " bidirectional edge count: 241\n" +
+ " average degree: 9.983\n" +
+ " density: 0.08605953\n" +
+ " triplet count: 29,286\n" +
+ " maximum degree: 91\n" +
+ " maximum out degree: 77\n" +
+ " maximum in degree: 68\n" +
+ " maximum triplets: 4,095\n" +
"\n" +
"Edge metrics:\n" +
- " triangle triplet count: 107,817\n" +
- " rectangle triplet count: 315,537\n" +
- " maximum triangle triplets: 820\n" +
- " maximum rectangle triplets: 3,822\n";
+ " triangle triplet count: 4,575\n" +
+ " rectangle triplet count: 11,756\n" +
+ " maximum triangle triplets: 153\n" +
+ " maximum rectangle triplets: 391\n";
+
+ expectedOutput(parameters(7, "directed", "hash"), expected);
+ expectedOutput(parameters(7, "directed", "print"), expected);
+ }
+
+ @Test
+ public void testWithLargeDirectedRMatIntegerGraph() throws Exception {
+ // skip 'byte' which cannot store vertex IDs for scale > 8
+ Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
+
+ // skip 'string' which does not compare numerically and generates a different triangle count
+ Assume.assumeFalse(idType.equals("string") || idType.equals("nativeString"));
- String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "directed",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
- "--output"};
+ String expected = "\n" +
+ "Vertex metrics:\n" +
+ " vertex count: 3,349\n" +
+ " edge count: 53,368\n" +
+ " unidirectional edge count: 43,602\n" +
+ " bidirectional edge count: 4,883\n" +
+ " average degree: 15.936\n" +
+ " density: 0.00475971\n" +
+ " triplet count: 9,276,207\n" +
+ " maximum degree: 1,356\n" +
+ " maximum out degree: 921\n" +
+ " maximum in degree: 966\n" +
+ " maximum triplets: 918,690\n" +
+ "\n" +
+ "Edge metrics:\n" +
+ " triangle triplet count: 779,202\n" +
+ " rectangle triplet count: 2,506,371\n" +
+ " maximum triangle triplets: 3,160\n" +
+ " maximum rectangle triplets: 16,835\n";
- expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
- expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+ expectedOutput(parameters(12, "directed", "hash"), expected);
+ expectedOutput(parameters(12, "directed", "print"), expected);
}
@Test
- public void testWithUndirectedRMatIntegerGraph() throws Exception {
+ public void testWithSmallUndirectedRMatIntegerGraph() throws Exception {
String expected = "\n" +
"Vertex metrics:\n" +
- " vertex count: 902\n" +
- " edge count: 10,442\n" +
- " average degree: 23.153\n" +
- " density: 0.025697\n" +
- " triplet count: 1,003,442\n" +
- " maximum degree: 463\n" +
- " maximum triplets: 106,953\n" +
+ " vertex count: 117\n" +
+ " edge count: 927\n" +
+ " average degree: 15.846\n" +
+ " density: 0.13660477\n" +
+ " triplet count: 29,286\n" +
+ " maximum degree: 91\n" +
+ " maximum triplets: 4,095\n" +
"\n" +
"Edge metrics:\n" +
- " triangle triplet count: 107,817\n" +
- " rectangle triplet count: 315,537\n" +
- " maximum triangle triplets: 820\n" +
- " maximum rectangle triplets: 3,822\n";
+ " triangle triplet count: 4,575\n" +
+ " rectangle triplet count: 11,756\n" +
+ " maximum triangle triplets: 153\n" +
+ " maximum rectangle triplets: 391\n";
+
+ expectedOutput(parameters(7, "undirected", "hash"), expected);
+ expectedOutput(parameters(7, "undirected", "print"), expected);
+ }
+
+ @Test
+ public void testWithLargelUndirectedRMatIntegerGraph() throws Exception {
+ // skip 'byte' which cannot store vertex IDs for scale > 8
+ Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
- String[] arguments = new String[]{"--algorithm", "GraphMetrics", "--order", "undirected",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
- "--output"};
+ // skip 'string' which does not compare numerically and generates a different triangle count
+ Assume.assumeFalse(idType.equals("string") || idType.equals("nativeString"));
+
+ String expected = "\n" +
+ "Vertex metrics:\n" +
+ " vertex count: 3,349\n" +
+ " edge count: 48,485\n" +
+ " average degree: 28.955\n" +
+ " density: 0.00864842\n" +
+ " triplet count: 9,276,207\n" +
+ " maximum degree: 1,356\n" +
+ " maximum triplets: 918,690\n" +
+ "\n" +
+ "Edge metrics:\n" +
+ " triangle triplet count: 779,202\n" +
+ " rectangle triplet count: 2,506,371\n" +
+ " maximum triangle triplets: 3,160\n" +
+ " maximum rectangle triplets: 16,835\n";
- expectedOutput(ArrayUtils.addAll(arguments, "hash"), expected);
- expectedOutput(ArrayUtils.addAll(arguments, "print"), expected);
+ expectedOutput(parameters(12, "undirected", "hash"), expected);
+ expectedOutput(parameters(12, "undirected", "print"), expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
index 5474d1b..282d3d5 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/HITSITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.drivers;
import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -27,8 +28,15 @@ import org.junit.runners.Parameterized;
public class HITSITCase
extends DriverBaseITCase {
- public HITSITCase(TestExecutionMode mode) {
- super(mode);
+ public HITSITCase(String idType, TestExecutionMode mode) {
+ super(idType, mode);
+ }
+
+ private String[] parameters(int scale, String output) {
+ return new String[] {
+ "--algorithm", "HITS",
+ "--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "directed",
+ "--output", output};
}
@Test
@@ -42,11 +50,21 @@ extends DriverBaseITCase {
}
@Test
- public void testPrintWithRMatIntegerGraph() throws Exception {
- expectedCount(
- new String[]{"--algorithm", "HITS",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
- "--output", "print"},
- 902);
+ public void testPrintWithSmallRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ expectedCount(parameters(8, "print"), 233);
+ }
+
+ @Test
+ public void testPrintWithLargeRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ // skip 'byte' which cannot store vertex IDs for scale > 8
+ Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
+
+ expectedCount(parameters(12, "print"), 3349);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
index 0632856..0391771 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/JaccardIndexITCase.java
@@ -18,17 +18,29 @@
package org.apache.flink.graph.drivers;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JaccardIndexITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
- public JaccardIndexITCase(TestExecutionMode mode) {
- super(mode);
+ public JaccardIndexITCase(String idType, TestExecutionMode mode) {
+ super(idType, mode);
+ }
+
+ private String[] parameters(int scale, String output, String... additionalParameters) {
+ String[] parameters = new String[] {
+ "--algorithm", "JaccardIndex", "--mirror_results",
+ "--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "undirected",
+ "--output", output};
+
+ return ArrayUtils.addAll(parameters, additionalParameters);
}
@Test
@@ -42,22 +54,67 @@ extends DriverBaseITCase {
}
@Test
- public void testHashWithRMatIntegerGraph() throws Exception {
- String expected = "\nChecksumHashCode 0x0001b188570b2572, count 221628\n";
-
- expectedOutput(
- new String[]{"--algorithm", "JaccardIndex",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
- "--output", "hash"},
- expected);
+ public void testHashWithSmallRMatGraph() throws Exception {
+ long checksum;
+ switch (idType) {
+ case "byte":
+ case "short":
+ case "char":
+ case "integer":
+ checksum = 0x0000164757052eebL;
+ break;
+
+ case "long":
+ checksum = 0x000016337a6a7270L;
+ break;
+
+ case "string":
+ checksum = 0x00001622a522a290L;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
+ expectedChecksum(parameters(7, "hash"), 11388, checksum);
}
@Test
- public void testPrintWithRMatIntegerGraph() throws Exception {
- expectedCount(
- new String[]{"--algorithm", "JaccardIndex",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
- "--output", "print"},
- 221628);
+ public void testHashWithLargeRMatGraph() throws Exception {
+ // computation is too large for collection mode
+ Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+ long checksum;
+ switch (idType) {
+ case "byte":
+ return;
+
+ case "short":
+ case "char":
+ case "integer":
+ checksum = 0x0021ce158d811c4eL;
+ break;
+
+ case "long":
+ checksum = 0x0021d20fb3904720L;
+ break;
+
+ case "string":
+ checksum = 0x0021cd8fafec1524L;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
+ expectedChecksum(parameters(12, "hash"), 4432058, checksum);
+ }
+
+ @Test
+ public void testPrintWithSmallRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ expectedOutputChecksum(parameters(7, "print"), new Checksum(11388, 0x0000163b17088256L));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
index d7301d0..4ca0a85 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/PageRankITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.drivers;
import org.apache.flink.client.program.ProgramParametrizationException;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -27,8 +28,15 @@ import org.junit.runners.Parameterized;
public class PageRankITCase
extends DriverBaseITCase {
- public PageRankITCase(TestExecutionMode mode) {
- super(mode);
+ public PageRankITCase(String idType, TestExecutionMode mode) {
+ super(idType, mode);
+ }
+
+ private String[] parameters(int scale, String output) {
+ return new String[] {
+ "--algorithm", "PageRank",
+ "--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", "directed",
+ "--output", output};
}
@Test
@@ -42,11 +50,21 @@ extends DriverBaseITCase {
}
@Test
- public void testPrintWithRMatIntegerGraph() throws Exception {
- expectedCount(
- new String[]{"--algorithm", "PageRank",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
- "--output", "print"},
- 902);
+ public void testPrintWithSmallRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ expectedCount(parameters(8, "print"), 233);
+ }
+
+ @Test
+ public void testPrintWithLargeRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ // skip 'byte' which cannot store vertex IDs for scale > 8
+ Assume.assumeFalse(idType.equals("byte") || idType.equals("nativeByte"));
+
+ expectedCount(parameters(12, "print"), 3349);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
index 0d2897c..fabdae1 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -18,17 +18,33 @@
package org.apache.flink.graph.drivers;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TriangleListingITCase
-extends DriverBaseITCase {
+extends CopyableValueDriverBaseITCase {
- public TriangleListingITCase(TestExecutionMode mode) {
- super(mode);
+ public TriangleListingITCase(String idType, TestExecutionMode mode) {
+ super(idType, mode);
+ }
+
+ private String[] parameters(int scale, String order, String output) {
+ String[] parameters = new String[] {
+ "--algorithm", "TriangleListing", "--order", order,
+ "--input", "RMatGraph", "--scale", Integer.toString(scale), "--type", idType, "--simplify", order,
+ "--output", output};
+
+ if (output.equals("hash")) {
+ return ArrayUtils.addAll(parameters, "--sort_triangle_vertices", "--triadic_census");
+ } else {
+ return parameters;
+ }
}
@Test
@@ -42,66 +58,224 @@ extends DriverBaseITCase {
}
@Test
- public void testDirectedHashWithRMatIntegerGraph() throws Exception {
+ public void testHashWithSmallDirectedRMatGraph() throws Exception {
+ long checksum;
+ switch (idType) {
+ case "byte":
+ case "short":
+ case "char":
+ case "integer":
+ checksum = 0x000000003d2f0a9aL;
+ break;
+
+ case "long":
+ checksum = 0x000000016aba3720L;
+ break;
+
+ case "string":
+ checksum = 0x0000005bfef84facL;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
String expected = "\n" +
- "ChecksumHashCode 0x0000001beffe6edd, count 75049\n" +
+ new Checksum(3822, checksum) + "\n" +
"Triadic census:\n" +
- " 003: 113,435,893\n" +
- " 012: 6,632,528\n" +
- " 102: 983,535\n" +
- " 021d: 118,574\n" +
- " 021u: 118,566\n" +
- " 021c: 237,767\n" +
- " 111d: 129,773\n" +
- " 111u: 130,041\n" +
- " 030t: 16,981\n" +
- " 030c: 5,535\n" +
- " 201: 43,574\n" +
- " 120d: 7,449\n" +
- " 120u: 7,587\n" +
- " 120c: 15,178\n" +
- " 210: 17,368\n" +
- " 300: 4,951\n";
-
- expectedOutput(
- new String[]{"--algorithm", "TriangleListing", "--order", "directed", "--sort_triangle_vertices", "--triadic_census",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
- "--output", "hash"},
- expected);
+ " 003: 178,989\n" +
+ " 012: 47,736\n" +
+ " 102: 11,763\n" +
+ " 021d: 2,258\n" +
+ " 021u: 2,064\n" +
+ " 021c: 4,426\n" +
+ " 111d: 3,359\n" +
+ " 111u: 3,747\n" +
+ " 030t: 624\n" +
+ " 030c: 220\n" +
+ " 201: 1,966\n" +
+ " 120d: 352\n" +
+ " 120u: 394\n" +
+ " 120c: 704\n" +
+ " 210: 1,120\n" +
+ " 300: 408\n";
+
+ expectedOutput(parameters(7, "directed", "hash"), expected);
}
@Test
- public void testDirectedPrintWithRMatIntegerGraph() throws Exception {
- expectedCount(
- new String[]{"--algorithm", "TriangleListing", "--order", "directed",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "directed",
- "--output", "print"},
- 75049);
+ public void testHashWithSmallUndirectedRMatGraph() throws Exception {
+ long checksum;
+ switch (idType) {
+ case "byte":
+ case "short":
+ case "char":
+ case "integer":
+ checksum = 0x0000000001f92b0cL;
+ break;
+
+ case "long":
+ checksum = 0x000000000bb355c6L;
+ break;
+
+ case "string":
+ checksum = 0x00000002f7b5576aL;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
+ String expected = "\n" +
+ new Checksum(3822, checksum) + "\n" +
+ "Triadic census:\n" +
+ " 03: 178,989\n" +
+ " 12: 59,499\n" +
+ " 21: 17,820\n" +
+ " 30: 3,822\n";
+
+ expectedOutput(parameters(7, "undirected", "hash"), expected);
}
@Test
- public void testUndirectedHashWithRMatIntegerGraph() throws Exception {
+ public void testHashWithLargeDirectedRMatGraph() throws Exception {
+ // computation is too large for collection mode
+ Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+ long checksum;
+ switch (idType) {
+ case "byte":
+ return;
+
+ case "short":
+ case "char":
+ case "integer":
+ checksum = 0x00000248fef26209L;
+ break;
+
+ case "long":
+ checksum = 0x000002dcdf0fbb1bL;
+ break;
+
+ case "string":
+ checksum = 0x00035b760ab9da74L;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
String expected = "\n" +
- "ChecksumHashCode 0x00000000e6b3f32c, count 75049\n" +
+ new Checksum(479818, checksum) + "\n" +
"Triadic census:\n" +
- " 03: 113,435,893\n" +
- " 12: 7,616,063\n" +
- " 21: 778,295\n" +
- " 30: 75,049\n";
-
- expectedOutput(
- new String[]{"--algorithm", "TriangleListing", "--order", "undirected", "--sort_triangle_vertices", "--triadic_census",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
- "--output", "hash"},
- expected);
+ " 003: 6,101,196,568\n" +
+ " 012: 132,051,207\n" +
+ " 102: 13,115,128\n" +
+ " 021d: 1,330,423\n" +
+ " 021u: 1,336,897\n" +
+ " 021c: 2,669,285\n" +
+ " 111d: 1,112,144\n" +
+ " 111u: 1,097,452\n" +
+ " 030t: 132,048\n" +
+ " 030c: 44,127\n" +
+ " 201: 290,552\n" +
+ " 120d: 47,734\n" +
+ " 120u: 47,780\n" +
+ " 120c: 95,855\n" +
+ " 210: 90,618\n" +
+ " 300: 21,656\n";
+
+ expectedOutput(parameters(12, "directed", "hash"), expected);
}
@Test
- public void testUndirectedPrintWithRMatIntegerGraph() throws Exception {
- expectedCount(
- new String[]{"--algorithm", "TriangleListing", "--order", "undirected",
- "--input", "RMatGraph", "--type", "integer", "--simplify", "undirected",
- "--output", "print"},
- 75049);
+ public void testHashWithLargeUndirectedRMatGraph() throws Exception {
+ // computation is too large for collection mode
+ Assume.assumeFalse(mode == TestExecutionMode.COLLECTION);
+
+ long checksum;
+ switch (idType) {
+ case "byte":
+ return;
+
+ case "short":
+ case "char":
+ case "integer":
+ checksum = 0x00000012dee4bf2cL;
+ break;
+
+ case "long":
+ checksum = 0x00000017a40efbdaL;
+ break;
+
+ case "string":
+ checksum = 0x000159e8be3e370bL;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
+ String expected = "\n" +
+ new Checksum(479818, checksum) + "\n" +
+ "Triadic census:\n" +
+ " 03: 6,101,196,568\n" +
+ " 12: 145,166,335\n" +
+ " 21: 7,836,753\n" +
+ " 30: 479,818\n";
+
+ expectedOutput(parameters(12, "undirected", "hash"), expected);
+ }
+
+
+ @Test
+ public void testPrintWithSmallDirectedRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ long checksum;
+ switch (idType) {
+ case "byte":
+ case "short":
+ case "integer":
+ case "long":
+ checksum = 0x00000764227995aaL;
+ break;
+
+ case "string":
+ checksum = 0x000007643d93c30aL;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
+ expectedOutputChecksum(parameters(7, "directed", "print"), new Checksum(3822, checksum));
+ }
+
+
+ @Test
+ public void testPrintWithSmallUndirectedRMatGraph() throws Exception {
+ // skip 'char' since it is not printed as a number
+ Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+ long checksum;
+ switch (idType) {
+ case "byte":
+ case "short":
+ case "integer":
+ case "long":
+ checksum = 0x0000077d1582e206L;
+ break;
+
+ case "string":
+ checksum = 0x0000077a49cb5268L;
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown type: " + idType);
+ }
+
+ expectedOutputChecksum(parameters(7, "undirected", "print"), new Checksum(3822, checksum));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
new file mode 100644
index 0000000..da77b0d
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/input/GeneratedGraphTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToChar;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToCharValue;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToString;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedByte;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedByteValue;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedInt;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToLong;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedShort;
+import org.apache.flink.graph.drivers.input.GeneratedGraph.LongValueToUnsignedShortValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GeneratedGraphTest {
+
+ private TranslateFunction<LongValue, ByteValue> byteValueTranslator = new LongValueToUnsignedByteValue();
+ private TranslateFunction<LongValue, Byte> byteTranslator = new LongValueToUnsignedByte();
+ private TranslateFunction<LongValue, ShortValue> shortValueTranslator = new LongValueToUnsignedShortValue();
+ private TranslateFunction<LongValue, Short> shortTranslator = new LongValueToUnsignedShort();
+ private TranslateFunction<LongValue, CharValue> charValueTranslator = new LongValueToCharValue();
+ private TranslateFunction<LongValue, Character> charTranslator = new LongValueToChar();
+ private TranslateFunction<LongValue, Integer> intTranslator = new LongValueToUnsignedInt();
+ private TranslateFunction<LongValue, Long> longTranslator = new LongValueToLong();
+ private TranslateFunction<LongValue, String> stringTranslator = new LongValueToString();
+
+ private ByteValue byteValue = new ByteValue();
+ private ShortValue shortValue = new ShortValue();
+ private CharValue charValue = new CharValue();
+
+ // ByteValue
+
+ @Test
+ public void testByteValueTranslation() throws Exception {
+ assertEquals(new ByteValue((byte) 0), byteValueTranslator.translate(new LongValue(0L), byteValue));
+ assertEquals(new ByteValue(Byte.MIN_VALUE), byteValueTranslator.translate(new LongValue((long) Byte.MAX_VALUE + 1), byteValue));
+ assertEquals(new ByteValue((byte) -1), byteValueTranslator.translate(new LongValue((1L << 8) - 1), byteValue));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testByteValueTranslationUpperOutOfRange() throws Exception {
+ byteValueTranslator.translate(new LongValue(1L << 8), byteValue);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testByteValueTranslationLowerOutOfRange() throws Exception {
+ byteValueTranslator.translate(new LongValue(-1), byteValue);
+ }
+
+ // Byte
+
+ @Test
+ public void testByteTranslation() throws Exception {
+ assertEquals(Byte.valueOf((byte) 0), byteTranslator.translate(new LongValue(0L), null));
+ assertEquals(Byte.valueOf(Byte.MIN_VALUE), byteTranslator.translate(new LongValue((long) Byte.MAX_VALUE + 1), null));
+ assertEquals(Byte.valueOf((byte) -1), byteTranslator.translate(new LongValue((1L << 8) - 1), null));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testByteTranslationUpperOutOfRange() throws Exception {
+ byteTranslator.translate(new LongValue(1L << 8), null);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testByteTranslationLowerOutOfRange() throws Exception {
+ byteTranslator.translate(new LongValue(-1), null);
+ }
+
+ // ShortValue
+
+ @Test
+ public void testShortValueTranslation() throws Exception {
+ assertEquals(new ShortValue((short) 0), shortValueTranslator.translate(new LongValue(0L), shortValue));
+ assertEquals(new ShortValue(Short.MIN_VALUE), shortValueTranslator.translate(new LongValue((long) Short.MAX_VALUE + 1), shortValue));
+ assertEquals(new ShortValue((short) -1), shortValueTranslator.translate(new LongValue((1L << 16) - 1), shortValue));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testShortValueTranslationUpperOutOfRange() throws Exception {
+ shortValueTranslator.translate(new LongValue(1L << 16), shortValue);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testShortValueTranslationLowerOutOfRange() throws Exception {
+ shortValueTranslator.translate(new LongValue(-1), shortValue);
+ }
+
+ // Short
+
+ @Test
+ public void testShortTranslation() throws Exception {
+ assertEquals(Short.valueOf((short) 0), shortTranslator.translate(new LongValue(0L), null));
+ assertEquals(Short.valueOf(Short.MIN_VALUE), shortTranslator.translate(new LongValue((long) Short.MAX_VALUE + 1), null));
+ assertEquals(Short.valueOf((short) -1), shortTranslator.translate(new LongValue((1L << 16) - 1), null));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testShortTranslationUpperOutOfRange() throws Exception {
+ shortTranslator.translate(new LongValue(1L << 16), null);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testShortTranslationLowerOutOfRange() throws Exception {
+ shortTranslator.translate(new LongValue(-1), null);
+ }
+
+ // CharValue
+
+ @Test
+ public void testCharValueTranslation() throws Exception {
+ assertEquals(new CharValue((char) 0), charValueTranslator.translate(new LongValue(0L), charValue));
+ assertEquals(new CharValue(Character.MAX_VALUE), charValueTranslator.translate(new LongValue((long) Character.MAX_VALUE), charValue));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testCharValueTranslationUpperOutOfRange() throws Exception {
+ charValueTranslator.translate(new LongValue(1L << 16), charValue);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testCharValueTranslationLowerOutOfRange() throws Exception {
+ charValueTranslator.translate(new LongValue(-1), charValue);
+ }
+
+ // Character
+
+ @Test
+ public void testCharacterTranslation() throws Exception {
+ assertEquals(Character.valueOf((char) 0), charTranslator.translate(new LongValue(0L), null));
+ assertEquals(Character.valueOf(Character.MAX_VALUE), charTranslator.translate(new LongValue((long) Character.MAX_VALUE), null));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testCharacterTranslationUpperOutOfRange() throws Exception {
+ charTranslator.translate(new LongValue(1L << 16), null);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testCharacterTranslationLowerOutOfRange() throws Exception {
+ charTranslator.translate(new LongValue(-1), null);
+ }
+
+ // Integer
+
+ @Test
+ public void testIntegerTranslation() throws Exception {
+ assertEquals(Integer.valueOf(0), intTranslator.translate(new LongValue(0L), null));
+ assertEquals(Integer.valueOf(Integer.MIN_VALUE), intTranslator.translate(new LongValue((long) Integer.MAX_VALUE + 1), null));
+ assertEquals(Integer.valueOf(-1), intTranslator.translate(new LongValue((1L << 32) - 1), null));
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testIntegerTranslationUpperOutOfRange() throws Exception {
+ intTranslator.translate(new LongValue(1L << 32), null);
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testIntegerTranslationLowerOutOfRange() throws Exception {
+ intTranslator.translate(new LongValue(-1), null);
+ }
+
+ // Long
+
+ @Test
+ public void testLongTranslation() throws Exception {
+ assertEquals(Long.valueOf(0L), longTranslator.translate(new LongValue(0L), null));
+ assertEquals(Long.valueOf(Long.MIN_VALUE), longTranslator.translate(new LongValue(Long.MIN_VALUE), null));
+ assertEquals(Long.valueOf(Long.MAX_VALUE), longTranslator.translate(new LongValue(Long.MAX_VALUE), null));
+ }
+
+ // String
+
+ @Test
+ public void testStringTranslation() throws Exception {
+ assertEquals("0", stringTranslator.translate(new LongValue(0L), null));
+ assertEquals("-9223372036854775808", stringTranslator.translate(new LongValue(Long.MIN_VALUE), null));
+ assertEquals("9223372036854775807", stringTranslator.translate(new LongValue(Long.MAX_VALUE), null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
index 038a2e4..30a74df 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValue.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.MathUtils;
public class LongValueToSignedIntValue
implements TranslateFunction<LongValue, IntValue> {
+ public static final long MAX_VERTEX_COUNT = 1L << 32;
+
@Override
public IntValue translate(LongValue value, IntValue reuse)
throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
index 8fe665c..741bd62 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValue.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue;
public class LongValueToUnsignedIntValue
implements TranslateFunction<LongValue, IntValue> {
+ public static final long MAX_VERTEX_COUNT = 1L << 32;
+
@Override
public IntValue translate(LongValue value, IntValue reuse)
throws Exception {
@@ -39,10 +41,10 @@ implements TranslateFunction<LongValue, IntValue> {
long l = value.getValue();
- if (l < 0 || l >= (1L << 32)) {
+ if (l < 0 || l >= MAX_VERTEX_COUNT) {
throw new IllegalArgumentException("Cannot cast long value " + value + " to integer.");
} else {
- reuse.setValue((int)(l & 0xffffffffL));
+ reuse.setValue((int) (l & (MAX_VERTEX_COUNT - 1)));
}
return reuse;
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index dfa7eb2..84cb791 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -80,17 +80,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
}
@ForwardedFields("*->f0")
- public class LinkVertexToAll
+ private static class LinkVertexToAll
implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
-
private final long vertexCount;
private LongValue target = new LongValue();
private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
- public LinkVertexToAll(long vertex_count) {
- this.vertexCount = vertex_count;
+ public LinkVertexToAll(long vertexCount) {
+ this.vertexCount = vertexCount;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 0ca804e..16a9ab9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -107,7 +107,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
}
@ForwardedFields("*->f0")
- public class LinkVertexToNeighbors
+ private static class LinkVertexToNeighbors
implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
private long vertexCount;
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 95a4f85..0fa4127 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -166,7 +166,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
return Graph.fromDataSet(vertices, edges, env);
}
- private static final class GenerateEdges<T extends RandomGenerator>
+ private static class GenerateEdges<T extends RandomGenerator>
implements FlatMapFunction<BlockInfo<T>, Edge<LongValue, NullValue>> {
// Configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 6c7c433..316d749 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -80,7 +80,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
}
@ForwardedFields("*->f0")
- public class LinkVertexToCenter
+ private static class LinkVertexToCenter
implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
private LongValue center = new LongValue(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 4857add..ba1ab21 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -60,8 +60,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
* <p>
* http://www.cs.cornell.edu/home/kleinber/auth.pdf
*
- * http://www.cs.cornell.edu/home/kleinber/auth.pdf
- *
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index bc3cb86..b3e69f1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -80,6 +80,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
private int maximumScoreDenominator = 1;
+ private boolean mirrorResults;
+
private int littleParallelism = PARALLELISM_DEFAULT;
/**
@@ -141,6 +143,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
/**
+ * By default only one result is output for each pair of vertices. When
+ * mirroring a second result with the vertex order flipped is output for
+ * each pair of vertices.
+ *
+ * @param mirrorResults whether output results should be mirrored
+ * @return this
+ */
+ public JaccardIndex<K, VV, EV> setMirrorResults(boolean mirrorResults) {
+ this.mirrorResults = mirrorResults;
+
+ return this;
+ }
+
+ /**
* Override the parallelism of operators processing small amounts of data.
*
* @param littleParallelism operator parallelism
@@ -176,7 +192,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
minimumScoreNumerator != rhs.minimumScoreNumerator ||
minimumScoreDenominator != rhs.minimumScoreDenominator ||
maximumScoreNumerator != rhs.maximumScoreNumerator ||
- maximumScoreDenominator != rhs.maximumScoreDenominator) {
+ maximumScoreDenominator != rhs.maximumScoreDenominator ||
+ mirrorResults != rhs.mirrorResults) {
return false;
}
@@ -230,12 +247,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
.name("Generate group pairs");
// t, u, intersection, union
- return twoPaths
+ DataSet<Result<K>> scores = twoPaths
.groupBy(0, 1)
.reduceGroup(new ComputeScores<K>(unboundedScores,
minimumScoreNumerator, minimumScoreDenominator,
maximumScoreNumerator, maximumScoreDenominator))
.name("Compute scores");
+
+ if (mirrorResults) {
+ scores = scores
+ .flatMap(new MirrorResult<K, Result<K>>())
+ .name("Mirror results");
+ }
+
+ return scores;
}
/**
@@ -451,6 +476,27 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
/**
+ * Output each input and a second result with the vertex order flipped.
+ *
+ * @param <T> ID type
+ * @param <RT> result type
+ */
+ private static class MirrorResult<T, RT extends BinaryResult<T>>
+ implements FlatMapFunction<RT, RT> {
+ @Override
+ public void flatMap(RT value, Collector<RT> out)
+ throws Exception {
+ out.collect(value);
+
+ T tmp = value.getVertexId0();
+ value.setVertexId0(value.getVertexId1());
+ value.setVertexId1(tmp);
+
+ out.collect(value);
+ }
+ }
+
+ /**
* Wraps the vertex type to encapsulate results from the jaccard index algorithm.
*
* @param <T> ID type
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 1811038..469a23f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -60,7 +60,7 @@ public class AsmTestBase {
env = ExecutionEnvironment.createCollectionsEnvironment();
env.getConfig().enableObjectReuse();
- // the "fish" graph
+ // a "fish" graph
Object[][] edges = new Object[][]{
new Object[]{0, 1},
new Object[]{0, 2},
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
index 976cc34..f730adf 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToSignedIntValueTest.java
@@ -40,11 +40,11 @@ public class LongValueToSignedIntValueTest {
@Test(expected=IllegalArgumentException.class)
public void testUpperOutOfRange() throws Exception {
- assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+ translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse);
}
@Test(expected=IllegalArgumentException.class)
public void testLowerOutOfRange() throws Exception {
- assertEquals(new IntValue(), translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse));
+ translator.translate(new LongValue((long)Integer.MIN_VALUE - 1), reuse);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/33695781/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
index 6a2ae83..ca70162 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/translators/LongValueToUnsignedIntValueTest.java
@@ -34,17 +34,17 @@ public class LongValueToUnsignedIntValueTest {
@Test
public void testTranslation() throws Exception {
assertEquals(new IntValue(0), translator.translate(new LongValue(0L), reuse));
- assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+ assertEquals(new IntValue(Integer.MIN_VALUE), translator.translate(new LongValue((long) Integer.MAX_VALUE + 1), reuse));
assertEquals(new IntValue(-1), translator.translate(new LongValue((1L << 32) - 1), reuse));
}
@Test(expected=IllegalArgumentException.class)
public void testUpperOutOfRange() throws Exception {
- assertEquals(new IntValue(), translator.translate(new LongValue(1L << 32), reuse));
+ translator.translate(new LongValue(1L << 32), reuse);
}
@Test(expected=IllegalArgumentException.class)
public void testLowerOutOfRange() throws Exception {
- assertEquals(new IntValue(), translator.translate(new LongValue(-1), reuse));
+ translator.translate(new LongValue(-1), reuse);
}
}