You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:13 UTC
[04/15] flink git commit: [FLINK-6709] [gelly] Activate strict
checkstyle for flink-gellies
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
new file mode 100644
index 0000000..9f9bc06
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.library.linkanalysis.HITS.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link HITS}.
+ */
+public class HITSTest
+extends AsmTestBase {
+
+ /*
+ * This test result can be verified with the following Python script.
+
+ import math
+ import networkx as nx
+
+ graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph())
+ hits=nx.algorithms.link_analysis.hits(graph)
+
+ hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values()))
+ authority_norm=math.sqrt(sum(v*v for v in hits[1].values()))
+
+ for key in sorted(hits[0]):
+ print('{}: {}, {}'.format(key, hits[0][key]/hubbiness_norm, hits[1][key]/authority_norm))
+ */
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(20)
+ .run(directedSimpleGraph);
+
+ List<Tuple2<Double, Double>> expectedResults = new ArrayList<>();
+ expectedResults.add(Tuple2.of(0.544643396306, 0.0));
+ expectedResults.add(Tuple2.of(0.0, 0.836329395866));
+ expectedResults.add(Tuple2.of(0.607227031134, 0.268492526138));
+ expectedResults.add(Tuple2.of(0.544643396306, 0.395444899355));
+ expectedResults.add(Tuple2.of(0.0, 0.268492526138));
+ expectedResults.add(Tuple2.of(0.194942233447, 0.0));
+
+ for (Result<IntValue> result : hits.collect()) {
+ int id = result.f0.getValue();
+ assertEquals(expectedResults.get(id).f0, result.getHubScore().getValue(), 0.000001);
+ assertEquals(expectedResults.get(id).f1, result.getAuthorityScore().getValue(), 0.000001);
+ }
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ double expectedScore = 1.0 / Math.sqrt(completeGraphVertexCount);
+
+ DataSet<Result<LongValue>> hits = new HITS<LongValue, NullValue, NullValue>(0.000001)
+ .run(completeGraph);
+
+ List<Result<LongValue>> results = hits.collect();
+
+ assertEquals(completeGraphVertexCount, results.size());
+
+ for (Result<LongValue> result : results) {
+ assertEquals(expectedScore, result.getHubScore().getValue(), 0.000001);
+ assertEquals(expectedScore, result.getAuthorityScore().getValue(), 0.000001);
+ }
+ }
+
+ /*
+ * This test result can be verified with the following Python script.
+
+ import math
+ import networkx as nx
+
+ graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+ hits=nx.algorithms.link_analysis.hits(graph)
+
+ hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values()))
+ authority_norm=math.sqrt(sum(v*v for v in hits[1].values()))
+
+ for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]:
+ print('{}: {}, {}'.format(key, hits[0][str(key)]/hubbiness_norm, hits[1][str(key)]/authority_norm))
+ */
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ DataSet<Result<LongValue>> hits = directedRMatGraph(10, 16)
+ .run(new HITS<LongValue, NullValue, NullValue>(0.000001));
+
+ Map<Long, Result<LongValue>> results = new HashMap<>();
+ for (Result<LongValue> result : new Collect<Result<LongValue>>().run(hits).execute()) {
+ results.put(result.f0.getValue(), result);
+ }
+
+ assertEquals(902, results.size());
+
+ Map<Long, Tuple2<Double, Double>> expectedResults = new HashMap<>();
+ // a pseudo-random selection of results, both high and low
+ expectedResults.put(0L, Tuple2.of(0.231077034747, 0.238110214937));
+ expectedResults.put(1L, Tuple2.of(0.162364053933, 0.169679504287));
+ expectedResults.put(2L, Tuple2.of(0.162412612499, 0.161015667261));
+ expectedResults.put(8L, Tuple2.of(0.167064641724, 0.158592966505));
+ expectedResults.put(13L, Tuple2.of(0.041915595624, 0.0407091625629));
+ expectedResults.put(29L, Tuple2.of(0.0102017346511, 0.0146218045999));
+ expectedResults.put(109L, Tuple2.of(0.00190531000389, 0.00481944993023));
+ expectedResults.put(394L, Tuple2.of(0.0122287016161, 0.0147987969538));
+ expectedResults.put(652L, Tuple2.of(0.010966659242, 0.0113713306749));
+ expectedResults.put(1020L, Tuple2.of(0.0, 0.000326973732127));
+
+ for (Map.Entry<Long, Tuple2<Double, Double>> expected : expectedResults.entrySet()) {
+ double hubScore = results.get(expected.getKey()).getHubScore().getValue();
+ double authorityScore = results.get(expected.getKey()).getAuthorityScore().getValue();
+
+ assertEquals(expected.getValue().f0, hubScore, 0.00001);
+ assertEquals(expected.getValue().f1, authorityScore, 0.00001);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java
new file mode 100644
index 0000000..9c3de71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.library.linkanalysis.PageRank.Result;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link PageRank}.
+ */
+public class PageRankTest
+extends AsmTestBase {
+
+ private static final double DAMPING_FACTOR = 0.85;
+
+ /*
+ * This test result can be verified with the following Python script.
+
+ import networkx as nx
+
+ graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph())
+ pagerank=nx.algorithms.link_analysis.pagerank(graph)
+
+ for key in sorted(pagerank):
+ print('{}: {}'.format(key, pagerank[key]))
+ */
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ DataSet<Result<IntValue>> pr = new PageRank<IntValue, NullValue, NullValue>(DAMPING_FACTOR, 10)
+ .run(directedSimpleGraph);
+
+ List<Double> expectedResults = new ArrayList<>();
+ expectedResults.add(0.09091296131286301);
+ expectedResults.add(0.27951855944178117);
+ expectedResults.add(0.12956847924535586);
+ expectedResults.add(0.22329643739217675);
+ expectedResults.add(0.18579060129496028);
+ expectedResults.add(0.09091296131286301);
+
+ for (Tuple2<IntValue, DoubleValue> result : pr.collect()) {
+ int id = result.f0.getValue();
+ assertEquals(expectedResults.get(id), result.f1.getValue(), 0.000001);
+ }
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ double expectedScore = 1.0 / completeGraphVertexCount;
+
+ DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001)
+ .run(completeGraph);
+
+ List<Result<LongValue>> results = pr.collect();
+
+ assertEquals(completeGraphVertexCount, results.size());
+
+ for (Tuple2<LongValue, DoubleValue> result : results) {
+ assertEquals(expectedScore, result.f1.getValue(), 0.000001);
+ }
+ }
+
+ /*
+ * This test result can be verified with the following Python script.
+
+ import networkx as nx
+
+ graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+ pagerank=nx.algorithms.link_analysis.pagerank(graph)
+
+ for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]:
+ print('{}: {}'.format(key, pagerank[str(key)]))
+ */
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001)
+ .run(directedRMatGraph(10, 16));
+
+ Map<Long, Result<LongValue>> results = new HashMap<>();
+ for (Result<LongValue> result : new Collect<Result<LongValue>>().run(pr).execute()) {
+ results.put(result.getVertexId0().getValue(), result);
+ }
+
+ assertEquals(902, results.size());
+
+ Map<Long, Double> expectedResults = new HashMap<>();
+ // a pseudo-random selection of results, both high and low
+ expectedResults.put(0L, 0.027111807822);
+ expectedResults.put(1L, 0.0132842310382);
+ expectedResults.put(2L, 0.0121818392504);
+ expectedResults.put(8L, 0.0115916809743);
+ expectedResults.put(13L, 0.00183249490033);
+ expectedResults.put(29L, 0.000848095047082);
+ expectedResults.put(109L, 0.000308507844048);
+ expectedResults.put(394L, 0.000828743280246);
+ expectedResults.put(652L, 0.000684102931253);
+ expectedResults.put(1020L, 0.000250487135148);
+
+ for (Map.Entry<Long, Double> expected : expectedResults.entrySet()) {
+ double value = results.get(expected.getKey()).getPageRankScore().getValue();
+
+ assertEquals(expected.getValue(), value, 0.00001);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
index 24f0c2d..9b1d18c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
@@ -22,10 +22,14 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.test.TestGraphUtils;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link ChecksumHashCode}.
+ */
public class ChecksumHashCodeTest
extends AsmTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
index 117b3ae..05042c2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
@@ -18,16 +18,20 @@
package org.apache.flink.graph.library.metric.directed;
-import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link EdgeMetrics}.
+ */
public class EdgeMetricsTest
extends AsmTestBase {
@@ -47,7 +51,7 @@ extends AsmTestBase {
public void testWithCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
- long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+ long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
Result expectedResult = new Result(expectedTriplets / 3, 2 * expectedTriplets / 3,
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
index 54301f5..f72a7bb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
@@ -18,16 +18,20 @@
package org.apache.flink.graph.library.metric.directed;
-import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link VertexMetrics}.
+ */
public class VertexMetricsTest
extends AsmTestBase {
@@ -48,7 +52,7 @@ extends AsmTestBase {
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedBidirectionalEdges = completeGraphVertexCount * expectedDegree / 2;
- long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+ long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
Result expectedResult = new Result(completeGraphVertexCount, 0, expectedBidirectionalEdges,
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
index b4e9f95..3e23906 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
@@ -18,16 +18,20 @@
package org.apache.flink.graph.library.metric.undirected;
-import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link EdgeMetrics}.
+ */
public class EdgeMetricsTest
extends AsmTestBase {
@@ -47,7 +51,7 @@ extends AsmTestBase {
public void testWithCompleteGraph()
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
- long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+ long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
Result expectedResult = new Result(expectedTriplets / 3, 2 * expectedTriplets / 3,
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
index 848ad79..71e587b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -18,16 +18,20 @@
package org.apache.flink.graph.library.metric.undirected;
-import org.apache.commons.math3.util.CombinatoricsUtils;
import org.apache.flink.graph.asm.AsmTestBase;
import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link VertexMetrics}.
+ */
public class VertexMetricsTest
extends AsmTestBase {
@@ -48,7 +52,7 @@ extends AsmTestBase {
throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
- long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+ long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets,
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
index 76b28da..aa259a2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
@@ -25,20 +25,24 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link AdamicAdar}.
+ */
public class AdamicAdarTest
extends AsmTestBase {
private float[] ilog = {
- 1.0f / (float)Math.log(2),
- 1.0f / (float)Math.log(3),
- 1.0f / (float)Math.log(3),
- 1.0f / (float)Math.log(4),
- 1.0f / (float)Math.log(1),
- 1.0f / (float)Math.log(1)
+ 1.0f / (float) Math.log(2),
+ 1.0f / (float) Math.log(3),
+ 1.0f / (float) Math.log(3),
+ 1.0f / (float) Math.log(4),
+ 1.0f / (float) Math.log(1),
+ 1.0f / (float) Math.log(1)
};
@Test
@@ -98,7 +102,7 @@ extends AsmTestBase {
@Test
public void testCompleteGraph()
throws Exception {
- float expectedScore = (completeGraphVertexCount - 2) / (float)Math.log(completeGraphVertexCount - 1);
+ float expectedScore = (completeGraphVertexCount - 2) / (float) Math.log(completeGraphVertexCount - 1);
DataSet<Result<LongValue>> aa = completeGraph
.run(new AdamicAdar<LongValue, NullValue, NullValue>());
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 2443359..d8cd298 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -27,10 +27,14 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link JaccardIndex}.
+ */
public class JaccardIndexTest
extends AsmTestBase {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
index fb21c14..71937db 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
@@ -38,12 +38,15 @@ import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+/**
+ * Validate compiled {@link VertexCentricIteration} programs.
+ */
public class PregelCompilerTest extends CompilerTestBase {
private static final long serialVersionUID = 1L;
@@ -51,223 +54,198 @@ public class PregelCompilerTest extends CompilerTestBase {
@SuppressWarnings("serial")
@Test
public void testPregelCompiler() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
- }
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
});
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
- new CCCompute(), null, 100).getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- }
-
- Plan p = env.createProgramPlan("Pregel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof SingleInputPlanNode);
-
- SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
- assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
-
- // check the computation coGroup
- DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
- assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
- assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
- assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
-
- assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+ DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+ new CCCompute(), null, 100).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
}
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+
+ Plan p = env.createProgramPlan("Pregel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof SingleInputPlanNode);
+
+ SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
+ assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
+
+ // check the computation coGroup
+ DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
+ assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
+ assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
+ assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
}
-
+
@SuppressWarnings("serial")
@Test
public void testPregelCompilerWithBroadcastVariable() {
- try {
- final String BC_VAR_NAME = "borat variable";
-
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
- DataSet<Long> bcVar = env.fromElements(1L);
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- });
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- VertexCentricConfiguration parameters = new VertexCentricConfiguration();
- parameters.addBroadcastSet(BC_VAR_NAME, bcVar);
-
- DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
- new CCCompute(), null, 100, parameters)
- .getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+ final String broadcastSetName = "broadcast";
- }
-
- Plan p = env.createProgramPlan("Pregel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof SingleInputPlanNode);
-
- SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
- assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
-
- // check the computation coGroup
- DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
- assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
- assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
- assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
-
- assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ });
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+ parameters.addBroadcastSet(broadcastSetName, bcVar);
+
+ DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+ new CCCompute(), null, 100, parameters)
+ .getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
}
+
+ Plan p = env.createProgramPlan("Pregel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof SingleInputPlanNode);
+
+ SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
+ assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
+
+ // check the computation coGroup
+ DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
+ assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
+ assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
+ assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
}
@SuppressWarnings("serial")
@Test
public void testPregelWithCombiner() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
- }
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
});
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
- new CCCompute(), new CCCombiner(), 100).getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- }
-
- Plan p = env.createProgramPlan("Pregel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the combiner
- SingleInputPlanNode combiner = (SingleInputPlanNode) iteration.getInput2().getSource();
- assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-
- // check the solution set delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof SingleInputPlanNode);
-
- SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
- assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
-
- // check the computation coGroup
- DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
- assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
- assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
- assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
-
- assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+ DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+ new CCCompute(), new CCCombiner(), 100).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
}
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+
+ Plan p = env.createProgramPlan("Pregel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the combiner
+ SingleInputPlanNode combiner = (SingleInputPlanNode) iteration.getInput2().getSource();
+ assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+
+ // check the solution set delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof SingleInputPlanNode);
+
+ SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
+ assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
+
+ // check the computation coGroup
+ DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
+ assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
+ assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
+ assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
}
@SuppressWarnings("serial")
@@ -283,7 +261,7 @@ public class PregelCompilerTest extends CompilerTestBase {
if ((getSuperstepNumber() == 1) || (currentComponent < vertex.getValue())) {
setNewVertexValue(currentComponent);
- for (Edge<Long, NullValue> edge: getEdges()) {
+ for (Edge<Long, NullValue> edge : getEdges()) {
sendMessageTo(edge.getTarget(), currentComponent);
}
}
@@ -291,16 +269,15 @@ public class PregelCompilerTest extends CompilerTestBase {
}
@SuppressWarnings("serial")
- public static final class CCCombiner extends MessageCombiner<Long, Long> {
+ private static final class CCCombiner extends MessageCombiner<Long, Long> {
public void combineMessages(MessageIterator<Long> messages) {
long minMessage = Long.MAX_VALUE;
- for (Long msg: messages) {
+ for (Long msg : messages) {
minMessage = Math.min(minMessage, msg);
}
sendCombinedMessage(minMessage);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
index 3bf2e32..8084e71 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.graph.pregel;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
@@ -33,100 +32,92 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+/**
+ * Test the creation of a {@link VertexCentricIteration} program.
+ */
@SuppressWarnings("serial")
public class PregelTranslationTest {
+ private static final String ITERATION_NAME = "Test Name";
+
+ private static final String AGGREGATOR_NAME = "AggregatorName";
+
+ private static final String BC_SET_NAME = "broadcast messages";
+
+ private static final int NUM_ITERATIONS = 13;
+
+ private static final int ITERATION_parallelism = 77;
+
@Test
public void testTranslationPlainEdges() {
- try {
- final String ITERATION_NAME = "Test Name";
-
- final String AGGREGATOR_NAME = "AggregatorName";
-
- final String BC_SET_NAME = "borat messages";
-
- final int NUM_ITERATIONS = 13;
-
- final int ITERATION_parallelism = 77;
-
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Long> bcVar = env.fromElements(1L);
-
- DataSet<Vertex<String, Double>> result;
-
- // ------------ construct the test program ------------------
- {
-
- DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
-
- DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
-
- Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
- edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
-
- public Tuple3<String, String, NullValue> map(
- Tuple2<String, String> edge) {
- return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- }), env);
-
- VertexCentricConfiguration parameters = new VertexCentricConfiguration();
-
- parameters.addBroadcastSet(BC_SET_NAME, bcVar);
- parameters.setName(ITERATION_NAME);
- parameters.setParallelism(ITERATION_parallelism);
- parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-
- result = graph.runVertexCentricIteration(new MyCompute(), null,
- NUM_ITERATIONS, parameters).getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
- }
-
-
- // ------------- validate the java program ----------------
-
- assertTrue(result instanceof DeltaIterationResultSet);
-
- DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
- DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
-
- // check the basic iteration properties
- assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
- assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
- assertEquals(ITERATION_parallelism, iteration.getParallelism());
- assertEquals(ITERATION_NAME, iteration.getName());
-
- assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-
- TwoInputUdfOperator<?, ?, ?, ?> computationCoGroup =
- (TwoInputUdfOperator<?, ?, ?, ?>) ((SingleInputUdfOperator<?, ?, ?>) resultSet.getNextWorkset()).getInput();
-
- // validate that the broadcast sets are forwarded
- assertEquals(bcVar, computationCoGroup.getBroadcastSets().get(BC_SET_NAME));
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Vertex<String, Double>> result;
+
+ // ------------ construct the test program ------------------
+
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
+
+ Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+ edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
+
+ public Tuple3<String, String, NullValue> map(
+ Tuple2<String, String> edge) {
+ return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ }), env);
+
+ VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+ parameters.addBroadcastSet(BC_SET_NAME, bcVar);
+ parameters.setName(ITERATION_NAME);
+ parameters.setParallelism(ITERATION_parallelism);
+ parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+ result = graph.runVertexCentricIteration(new MyCompute(), null,
+ NUM_ITERATIONS, parameters).getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+
+ // ------------- validate the java program ----------------
+
+ assertTrue(result instanceof DeltaIterationResultSet);
+
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
+
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[]{0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
+
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+ TwoInputUdfOperator<?, ?, ?, ?> computationCoGroup =
+ (TwoInputUdfOperator<?, ?, ?, ?>) ((SingleInputUdfOperator<?, ?, ?>) resultSet.getNextWorkset()).getInput();
+
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcVar, computationCoGroup.getBroadcastSets().get(BC_SET_NAME));
}
-
+
// --------------------------------------------------------------------------------------------
private static final class MyCompute extends ComputeFunction<String, Double, NullValue, Double> {
@Override
- public void compute(Vertex<String, Double> vertex,
- MessageIterator<Double> messages) throws Exception {}
+ public void compute(Vertex<String, Double> vertex, MessageIterator<Double> messages) throws Exception {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 676e0cd..1c6d08e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -40,13 +40,15 @@ import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+/**
+ * Validate compiled {@link ScatterGatherIteration} programs.
+ */
public class SpargelCompilerTest extends CompilerTestBase {
private static final long serialVersionUID = 1L;
@@ -54,161 +56,143 @@ public class SpargelCompilerTest extends CompilerTestBase {
@SuppressWarnings("serial")
@Test
public void testSpargelCompiler() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- });
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
- new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
- new ConnectedComponents.CCUpdater<Long, Long>(), 100)
- .getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
- }
-
- Plan p = env.createProgramPlan("Spargel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set join and the delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-
- DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-
- // check the workset set join
- DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
- assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-
- assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
- assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-
- // check that the initial workset sort is outside the loop
- assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
- assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+
+ // compose test program
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ });
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
+ new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+ new ConnectedComponents.CCUpdater<Long, Long>(), 100)
+ .getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+ Plan p = env.createProgramPlan("Spargel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+
+ // check that the initial workset sort is outside the loop
+ assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+ assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
}
-
+
@SuppressWarnings("serial")
@Test
public void testSpargelCompilerWithBroadcastVariable() {
- try {
- final String BC_VAR_NAME = "borat variable";
-
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(DEFAULT_PARALLELISM);
- // compose test program
- {
- DataSet<Long> bcVar = env.fromElements(1L);
-
- DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
- new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
- .map(new Tuple2ToVertexMap<Long, Long>());
-
- DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
- .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
- public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
- return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- });
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
- parameters.addBroadcastSetForScatterFunction(BC_VAR_NAME, bcVar);
- parameters.addBroadcastSetForGatherFunction(BC_VAR_NAME, bcVar);
-
- DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
- new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
- new ConnectedComponents.CCUpdater<Long, Long>(), 100)
- .getVertices();
-
- result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-
- }
-
- Plan p = env.createProgramPlan("Spargel Connected Components");
- OptimizedPlan op = compileNoStats(p);
-
- // check the sink
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
- assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-
- // check the iteration
- WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
- assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
- // check the solution set join and the delta
- PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
- assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-
- DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
- assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
- assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-
- // check the workset set join
- DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
- assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
- assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
- assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-
- assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-
- // check that the initial partitioning is pushed out of the loop
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
- assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
- assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
- assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final String broadcastVariableName = "broadcast variable";
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+
+ // compose test program
+
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+ new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+ .map(new Tuple2ToVertexMap<Long, Long>());
+
+ DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+ return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ });
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+ parameters.addBroadcastSetForScatterFunction(broadcastVariableName, bcVar);
+ parameters.addBroadcastSetForGatherFunction(broadcastVariableName, bcVar);
+
+ DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
+ new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+ new ConnectedComponents.CCUpdater<Long, Long>(), 100)
+ .getVertices();
+
+ result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+ Plan p = env.createProgramPlan("Spargel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index 47b785d..d209a2d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -31,187 +31,156 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.NullValue;
+
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+/**
+ * Test the creation of a {@link ScatterGatherIteration} program.
+ */
@SuppressWarnings("serial")
public class SpargelTranslationTest {
- @Test
- public void testTranslationPlainEdges() {
- try {
- final String ITERATION_NAME = "Test Name";
-
- final String AGGREGATOR_NAME = "AggregatorName";
-
- final String BC_SET_MESSAGES_NAME = "borat messages";
+ private static final String ITERATION_NAME = "Test Name";
- final String BC_SET_UPDATES_NAME = "borat updates";
+ private static final String AGGREGATOR_NAME = "AggregatorName";
- final int NUM_ITERATIONS = 13;
+ private static final String BC_SET_MESSAGES_NAME = "borat messages";
- final int ITERATION_parallelism = 77;
+ private static final String BC_SET_UPDATES_NAME = "borat updates";
+ private static final int NUM_ITERATIONS = 13;
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ private static final int ITERATION_parallelism = 77;
- DataSet<Long> bcMessaging = env.fromElements(1L);
- DataSet<Long> bcUpdate = env.fromElements(1L);
+ @Test
+ public void testTranslationPlainEdges() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Vertex<String, Double>> result;
+ DataSet<Long> bcMessaging = env.fromElements(1L);
+ DataSet<Long> bcUpdate = env.fromElements(1L);
- // ------------ construct the test program ------------------
- {
+ DataSet<Vertex<String, Double>> result;
- DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+ // ------------ construct the test program ------------------
- DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
- Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
- edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
- public Tuple3<String, String, NullValue> map(
- Tuple2<String, String> edge) {
- return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- }), env);
+ Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+ edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
- ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+ public Tuple3<String, String, NullValue> map(
+ Tuple2<String, String> edge) {
+ return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ }), env);
- parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging);
- parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate);
- parameters.setName(ITERATION_NAME);
- parameters.setParallelism(ITERATION_parallelism);
- parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+ ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
- result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
- NUM_ITERATIONS, parameters).getVertices();
+ parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+ parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate);
+ parameters.setName(ITERATION_NAME);
+ parameters.setParallelism(ITERATION_parallelism);
+ parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
- result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
- }
+ result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
+ NUM_ITERATIONS, parameters).getVertices();
+ result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
- // ------------- validate the java program ----------------
+ // ------------- validate the java program ----------------
- assertTrue(result instanceof DeltaIterationResultSet);
+ assertTrue(result instanceof DeltaIterationResultSet);
- DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
- DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
- // check the basic iteration properties
- assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
- assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
- assertEquals(ITERATION_parallelism, iteration.getParallelism());
- assertEquals(ITERATION_NAME, iteration.getName());
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[]{0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
- assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
- // validate that the semantic properties are set as they should
- TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
- assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
- assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
- TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+ TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
- // validate that the broadcast sets are forwarded
- assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
- assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+ assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
}
@Test
public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
- try {
- final String ITERATION_NAME = "Test Name";
-
- final String AGGREGATOR_NAME = "AggregatorName";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- final String BC_SET_MESSAGES_NAME = "borat messages";
+ DataSet<Long> bcVar = env.fromElements(1L);
- final String BC_SET_UPDATES_NAME = "borat updates";
+ DataSet<Vertex<String, Double>> result;
- final int NUM_ITERATIONS = 13;
+ // ------------ construct the test program ------------------
- final int ITERATION_parallelism = 77;
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+ edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
- DataSet<Long> bcVar = env.fromElements(1L);
+ public Tuple3<String, String, NullValue> map(
+ Tuple2<String, String> edge) {
+ return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
+ }
+ }), env);
- DataSet<Vertex<String, Double>> result;
+ ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
- // ------------ construct the test program ------------------
- {
+ parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar);
+ parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar);
+ parameters.setName(ITERATION_NAME);
+ parameters.setParallelism(ITERATION_parallelism);
+ parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
- DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+ result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
+ NUM_ITERATIONS, parameters).getVertices();
- DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
+ result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
- Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
- edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
+ // ------------- validate the java program ----------------
- public Tuple3<String, String, NullValue> map(
- Tuple2<String, String> edge) {
- return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
- }
- }), env);
+ assertTrue(result instanceof DeltaIterationResultSet);
- ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
- parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar);
- parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar);
- parameters.setName(ITERATION_NAME);
- parameters.setParallelism(ITERATION_parallelism);
- parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[]{0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_parallelism, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
- result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
- NUM_ITERATIONS, parameters).getVertices();
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
- result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
- }
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+ assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+ TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
- // ------------- validate the java program ----------------
-
- assertTrue(result instanceof DeltaIterationResultSet);
-
- DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
- DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
-
- // check the basic iteration properties
- assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
- assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
- assertEquals(ITERATION_parallelism, iteration.getParallelism());
- assertEquals(ITERATION_NAME, iteration.getName());
-
- assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-
- // validate that the semantic properties are set as they should
- TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
- assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
- assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-
- TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-
- // validate that the broadcast sets are forwarded
- assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
- assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+ assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
}
// --------------------------------------------------------------------------------------------
@@ -219,12 +188,14 @@ public class SpargelTranslationTest {
private static class MessageFunctionNoEdgeValue extends ScatterFunction<String, Double, Long, NullValue> {
@Override
- public void sendMessages(Vertex<String, Double> vertex) {}
+ public void sendMessages(Vertex<String, Double> vertex) {
+ }
}
private static class UpdateFunction extends GatherFunction<String, Double, Long> {
@Override
- public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {}
+ public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index 78f0ba2..2454b38 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -29,18 +29,19 @@ import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.util.TestLogger;
+
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Dummy iteration to test that the supersteps are correctly incremented
+ * and can be retrieved from inside the scatter and gather functions.
+ * All vertices start with value 1 and increase their value by 1
+ * in each iteration.
+ */
@SuppressWarnings("serial")
public class CollectionModeSuperstepITCase extends TestLogger {
- /**
- * Dummy iteration to test that the supersteps are correctly incremented
- * and can be retrieved from inside the scatter and gather functions.
- * All vertices start with value 1 and increase their value by 1
- * in each iteration.
- */
@Test
public void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 183522d..f866f38 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,6 +43,9 @@ import org.junit.runners.Parameterized;
import java.util.HashSet;
import java.util.List;
+/**
+ * Tests for {@link GSAConfiguration}.
+ */
@RunWith(Parameterized.class)
public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase {
@@ -235,7 +239,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test bcast variable
@SuppressWarnings("unchecked")
- List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("gatherBcastSet");
+ List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("gatherBcastSet");
Assert.assertEquals(1, bcastSet.get(0).intValue());
Assert.assertEquals(2, bcastSet.get(1).intValue());
Assert.assertEquals(3, bcastSet.get(2).intValue());
@@ -266,7 +270,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test bcast variable
@SuppressWarnings("unchecked")
- List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("sumBcastSet");
+ List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("sumBcastSet");
Assert.assertEquals(4, bcastSet.get(0).intValue());
Assert.assertEquals(5, bcastSet.get(1).intValue());
Assert.assertEquals(6, bcastSet.get(2).intValue());
@@ -295,7 +299,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
// test bcast variable
@SuppressWarnings("unchecked")
- List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("applyBcastSet");
+ List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("applyBcastSet");
Assert.assertEquals(7, bcastSet.get(0).intValue());
Assert.assertEquals(8, bcastSet.get(1).intValue());
Assert.assertEquals(9, bcastSet.get(2).intValue());
@@ -346,7 +350,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
}
@SuppressWarnings("serial")
- public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+ private static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
public Long map(Vertex<Long, Long> value) {
return 1L;
@@ -354,7 +358,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
}
@SuppressWarnings("serial")
- public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+ private static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
@Override
public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index 3c091a9..139ff1e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.graph.spargel.ScatterGatherIteration;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,6 +44,9 @@ import org.junit.runners.Parameterized;
import java.util.HashSet;
import java.util.List;
+/**
+ * Tests for {@link ScatterGatherConfiguration}.
+ */
@RunWith(Parameterized.class)
public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
@@ -133,7 +137,6 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
new MessageFunctionDefault(), new UpdateFunctionDefault(), 5);
-
DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
List<Tuple2<Long, Long>> result = data.collect();
@@ -333,7 +336,6 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expectedResult);
}
-
@Test
public void testNumVerticesNotSet() throws Exception {
@@ -508,7 +510,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
// test bcast variable
@SuppressWarnings("unchecked")
- List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("messagingBcastSet");
+ List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("messagingBcastSet");
Assert.assertEquals(4, bcastSet.get(0).intValue());
Assert.assertEquals(5, bcastSet.get(1).intValue());
Assert.assertEquals(6, bcastSet.get(2).intValue());
@@ -556,7 +558,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
// test bcast variable
@SuppressWarnings("unchecked")
- List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("updateBcastSet");
+ List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("updateBcastSet");
Assert.assertEquals(1, bcastSet.get(0).intValue());
Assert.assertEquals(2, bcastSet.get(1).intValue());
Assert.assertEquals(3, bcastSet.get(2).intValue());