You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/01/17 21:35:47 UTC
[4/6] flink git commit: [hotfix] [gelly] Indent Java with tabs not
spaces
http://git-wip-us.apache.org/repos/asf/flink/blob/cb282067/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index 84d7722..b26bb43 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -41,11 +41,11 @@ import java.util.Objects;
@RunWith(Parameterized.class)
public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
- public ReduceOnEdgesMethodsITCase(TestExecutionMode mode){
+ public ReduceOnEdgesMethodsITCase(TestExecutionMode mode) {
super(mode);
}
- private String expectedResult;
+ private String expectedResult;
@Test
public void testLowestWeightOutNeighbor() throws Exception {
@@ -54,20 +54,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect();
-
expectedResult = "1,2\n" +
- "2,3\n" +
- "3,4\n" +
- "4,5\n" +
- "5,1\n";
-
+ "2,3\n" +
+ "3,4\n" +
+ "4,5\n" +
+ "5,1\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -78,19 +77,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect();
expectedResult = "1,5\n" +
- "2,1\n" +
- "3,1\n" +
- "4,3\n" +
- "5,3\n";
-
+ "2,1\n" +
+ "3,1\n" +
+ "4,3\n" +
+ "5,3\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -101,20 +100,20 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
- graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT);
+ graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithAllOutNeighbors.collect();
expectedResult = "1,2\n" +
- "1,3\n" +
- "2,3\n" +
- "3,4\n" +
- "3,5\n" +
- "4,5\n" +
- "5,1";
-
+ "1,3\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,5\n" +
+ "5,1";
+
compareResultAsTuples(result, expectedResult);
}
@@ -125,19 +124,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
- graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT);
+ graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithAllOutNeighbors.collect();
expectedResult = "1,2\n" +
- "1,3\n" +
- "2,3\n" +
- "3,4\n" +
- "3,5\n" +
- "4,5";
-
+ "1,3\n" +
+ "2,3\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,5";
+
compareResultAsTuples(result, expectedResult);
}
@@ -148,17 +147,17 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
- graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT);
+ graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithAllOutNeighbors.collect();
expectedResult = "3,4\n" +
- "3,5\n" +
- "4,5\n" +
- "5,1";
-
+ "3,5\n" +
+ "4,5\n" +
+ "5,1";
+
compareResultAsTuples(result, expectedResult);
}
@@ -169,20 +168,20 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
- graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN);
+ graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "1,5\n" +
- "2,1\n" +
- "3,1\n" +
- "3,2\n" +
- "4,3\n" +
- "5,3\n" +
- "5,4";
-
+ "2,1\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "4,3\n" +
+ "5,3\n" +
+ "5,4";
+
compareResultAsTuples(result, expectedResult);
}
@@ -193,18 +192,18 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
- graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN);
+ graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "1,5\n" +
- "2,1\n" +
- "3,1\n" +
- "3,2\n" +
- "4,3";
-
+ "2,1\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "4,3";
+
compareResultAsTuples(result, expectedResult);
}
@@ -215,18 +214,18 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
- graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
+ graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "3,1\n" +
- "3,2\n" +
- "4,3\n" +
- "5,3\n" +
- "5,4";
-
+ "3,2\n" +
+ "4,3\n" +
+ "5,3\n" +
+ "5,4";
+
compareResultAsTuples(result, expectedResult);
}
@@ -237,27 +236,27 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
- graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL);
+ graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithAllNeighbors.collect();
expectedResult = "1,2\n" +
- "1,3\n" +
- "1,5\n" +
- "2,1\n" +
- "2,3\n" +
- "3,1\n" +
- "3,2\n" +
- "3,4\n" +
- "3,5\n" +
- "4,3\n" +
- "4,5\n" +
- "5,1\n" +
- "5,3\n" +
- "5,4";
-
+ "1,3\n" +
+ "1,5\n" +
+ "2,1\n" +
+ "2,3\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,3\n" +
+ "4,5\n" +
+ "5,1\n" +
+ "5,3\n" +
+ "5,4";
+
compareResultAsTuples(result, expectedResult);
}
@@ -268,22 +267,22 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
- graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL);
+ graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithAllNeighbors.collect();
expectedResult = "1,2\n" +
- "1,3\n" +
- "1,5\n" +
- "3,1\n" +
- "3,2\n" +
- "3,4\n" +
- "3,5\n" +
- "4,3\n" +
- "4,5";
-
+ "1,3\n" +
+ "1,5\n" +
+ "3,1\n" +
+ "3,2\n" +
+ "3,4\n" +
+ "3,5\n" +
+ "4,3\n" +
+ "4,5";
+
compareResultAsTuples(result, expectedResult);
}
@@ -294,16 +293,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
- graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
+ graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithAllNeighbors.collect();
expectedResult = "5,1\n" +
- "5,3\n" +
- "5,4";
-
+ "5,3\n" +
+ "5,4";
+
compareResultAsTuples(result, expectedResult);
}
@@ -314,19 +313,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
* of a vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
- graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+ DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
+ graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithMaxEdgeWeight.collect();
expectedResult = "1,51\n" +
- "2,23\n" +
- "3,35\n" +
- "4,45\n" +
- "5,51\n";
-
+ "2,23\n" +
+ "3,35\n" +
+ "4,45\n" +
+ "5,51\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -337,19 +336,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
* of each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect();
expectedResult = "1,12\n" +
- "2,23\n" +
- "3,34\n" +
- "4,45\n" +
- "5,51\n";
-
+ "2,23\n" +
+ "3,34\n" +
+ "4,45\n" +
+ "5,51\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -360,19 +359,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
* of each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
- graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN);
+ DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
+ graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect();
expectedResult = "1,51\n" +
- "2,12\n" +
- "3,13\n" +
- "4,34\n" +
- "5,35\n";
-
+ "2,12\n" +
+ "3,13\n" +
+ "4,34\n" +
+ "5,35\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -383,19 +382,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
* of a vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
- graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
+ DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
+ graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithMaxEdgeWeight.collect();
expectedResult = "1,51\n" +
- "2,23\n" +
- "3,35\n" +
- "4,45\n" +
- "5,51\n";
-
+ "2,23\n" +
+ "3,35\n" +
+ "4,45\n" +
+ "5,51\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -404,12 +403,12 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
+
long weight = Long.MAX_VALUE;
long minNeighborId = 0;
- for (Edge<Long, Long> edge: edges) {
+ for (Edge<Long, Long> edge : edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
minNeighborId = edge.getTarget();
@@ -424,11 +423,11 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
+
long weight = Long.MIN_VALUE;
- for (Edge<Long, Long> edge: edges) {
+ for (Edge<Long, Long> edge : edges) {
if (edge.getValue() > weight) {
weight = edge.getValue();
}
@@ -460,12 +459,12 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Vertex<Long, Long> v,
- Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
+
long weight = Long.MAX_VALUE;
long minNeighborId = 0;
-
- for (Edge<Long, Long> edge: edges) {
+
+ for (Edge<Long, Long> edge : edges) {
if (edge.getValue() < weight) {
weight = edge.getValue();
minNeighborId = edge.getSource();
@@ -480,9 +479,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
- for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
}
}
@@ -493,10 +492,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
- for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
- if(edge.f0 != 5) {
+ for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ if (edge.f0 != 5) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
}
}
@@ -505,13 +504,14 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class SelectOutNeighborsValueGreaterThanTwo implements
- EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+ EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
- for (Edge<Long, Long> edge: edges) {
- if(v.getValue() > 2) {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for (Edge<Long, Long> edge : edges) {
+ if (v.getValue() > 2) {
out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
}
}
@@ -523,9 +523,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
- for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
}
}
@@ -536,10 +536,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
- for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
- if(edge.f0 != 5) {
+ for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
+ if (edge.f0 != 5) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
}
}
@@ -548,13 +548,14 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class SelectInNeighborsValueGreaterThanTwo implements
- EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+ EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
- for (Edge<Long, Long> edge: edges) {
- if(v.getValue() > 2) {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for (Edge<Long, Long> edge : edges) {
+ if (v.getValue() > 2) {
out.collect(new Tuple2<>(v.getId(), edge.getSource()));
}
}
@@ -566,7 +567,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
if (Objects.equals(edge.f0, edge.f1.getTarget())) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
@@ -582,9 +583,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@Override
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
- if(edge.f0 != 5 && edge.f0 != 2) {
+ if (edge.f0 != 5 && edge.f0 != 2) {
if (Objects.equals(edge.f0, edge.f1.getTarget())) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
} else {
@@ -597,14 +599,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class SelectNeighborsValueGreaterThanFour implements
- EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+ EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
- Collector<Tuple2<Long, Long>> out) throws Exception {
- for(Edge<Long, Long> edge : edges) {
- if(v.getValue() > 4) {
- if(v.getId().equals(edge.getTarget())) {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
+ for (Edge<Long, Long> edge : edges) {
+ if (v.getValue() > 4) {
+ if (v.getId().equals(edge.getTarget())) {
out.collect(new Tuple2<>(v.getId(), edge.getSource()));
} else {
out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
http://git-wip-us.apache.org/repos/asf/flink/blob/cb282067/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index a352bb4..7fad2e8 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -41,11 +41,11 @@ import java.util.List;
@RunWith(Parameterized.class)
public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
- public ReduceOnNeighborMethodsITCase(TestExecutionMode mode){
+ public ReduceOnNeighborMethodsITCase(TestExecutionMode mode) {
super(mode);
}
- private String expectedResult;
+ private String expectedResult;
@Test
public void testSumOfOutNeighbors() throws Exception {
@@ -54,19 +54,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
+
expectedResult = "1,5\n" +
- "2,3\n" +
- "3,9\n" +
- "4,5\n" +
- "5,1\n";
-
+ "2,3\n" +
+ "3,9\n" +
+ "4,5\n" +
+ "5,1\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -77,22 +77,20 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
* times the edge weights for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
+ DataSet<Tuple2<Long, Long>> verticesWithSum =
+ graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithSum.collect();
-
+
expectedResult = "1,255\n" +
- "2,12\n" +
- "3,59\n" +
- "4,102\n" +
- "5,285\n";
-
+ "2,12\n" +
+ "3,59\n" +
+ "4,102\n" +
+ "5,285\n";
+
compareResultAsTuples(result, expectedResult);
-
-
}
@Test
@@ -103,19 +101,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,11\n" +
- "2,6\n" +
- "3,15\n" +
- "4,12\n" +
- "5,13\n";
-
+ "2,6\n" +
+ "3,15\n" +
+ "4,12\n" +
+ "5,13\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -127,15 +125,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
+ graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
+
expectedResult = "4,5\n" +
- "5,1\n";
-
+ "5,1\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -147,15 +145,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
+ graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithSum.collect();
-
+
expectedResult = "4,102\n" +
- "5,285\n";
-
+ "5,285\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -168,15 +166,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
+ graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "4,12\n" +
- "5,13\n";
-
+ "5,13\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -187,19 +185,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
+ DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+ graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,5\n" +
- "2,3\n" +
- "3,9\n" +
- "4,5\n" +
- "5,1\n";
-
+ "2,3\n" +
+ "3,9\n" +
+ "4,5\n" +
+ "5,1\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -210,19 +208,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
* times the edge weights for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
+ DataSet<Tuple2<Long, Long>> verticesWithSum =
+ graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithSum.collect();
-
+
expectedResult = "1,255\n" +
- "2,12\n" +
- "3,59\n" +
- "4,102\n" +
- "5,285\n";
-
+ "2,12\n" +
+ "3,59\n" +
+ "4,102\n" +
+ "5,285\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -233,19 +231,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
* for each vertex
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
- graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
+ graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithSumOfAllNeighborValues.collect();
-
+
expectedResult = "1,10\n" +
- "2,4\n" +
- "3,12\n" +
- "4,8\n" +
- "5,8\n";
-
+ "2,4\n" +
+ "3,12\n" +
+ "4,8\n" +
+ "5,8\n";
+
compareResultAsTuples(result, expectedResult);
}
@@ -257,19 +255,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
+ graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "3,9\n" +
- "3,18\n" +
- "4,5\n" +
- "4,10\n" +
- "5,1\n" +
- "5,2";
-
+ "3,18\n" +
+ "4,5\n" +
+ "4,10\n" +
+ "5,1\n" +
+ "5,2";
+
compareResultAsTuples(result, expectedResult);
}
@@ -281,19 +279,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
+ graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "3,59\n" +
- "3,118\n" +
- "4,204\n" +
- "4,102\n" +
- "5,570\n" +
- "5,285";
-
+ "3,118\n" +
+ "4,204\n" +
+ "4,102\n" +
+ "5,570\n" +
+ "5,285";
+
compareResultAsTuples(result, expectedResult);
}
@@ -305,19 +303,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
+ graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithSumOfAllNeighborValues.collect();
expectedResult = "3,12\n" +
- "3,24\n" +
- "4,8\n" +
- "4,16\n" +
- "5,8\n" +
- "5,16";
-
+ "3,24\n" +
+ "4,8\n" +
+ "4,16\n" +
+ "5,8\n" +
+ "5,16";
+
compareResultAsTuples(result, expectedResult);
}
@@ -329,23 +327,23 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
+ graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
-
+
expectedResult = "1,5\n" +
- "1,10\n" +
- "2,3\n" +
- "2,6\n" +
- "3,9\n" +
- "3,18\n" +
- "4,5\n" +
- "4,10\n" +
- "5,1\n" +
- "5,2";
-
+ "1,10\n" +
+ "2,3\n" +
+ "2,6\n" +
+ "3,9\n" +
+ "3,18\n" +
+ "4,5\n" +
+ "4,10\n" +
+ "5,1\n" +
+ "5,2";
+
compareResultAsTuples(result, expectedResult);
}
@@ -357,23 +355,23 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSum =
- graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
+ graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
List<Tuple2<Long, Long>> result = verticesWithSum.collect();
-
+
expectedResult = "1,255\n" +
- "1,254\n" +
- "2,12\n" +
- "2,11\n" +
- "3,59\n" +
- "3,58\n" +
- "4,102\n" +
- "4,101\n" +
- "5,285\n" +
- "5,284";
-
+ "1,254\n" +
+ "2,12\n" +
+ "2,11\n" +
+ "3,59\n" +
+ "3,58\n" +
+ "4,102\n" +
+ "4,101\n" +
+ "5,285\n" +
+ "5,284";
+
compareResultAsTuples(result, expectedResult);
}
@@ -386,35 +384,35 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
- TestGraphUtils.getLongLongEdgeData(env), env);
+ TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
- graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
+ graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,11\n" +
- "1,16\n" +
- "2,6\n" +
- "2,11\n" +
- "3,15\n" +
- "3,20\n" +
- "4,12\n" +
- "4,17\n" +
- "5,13\n" +
- "5,18";
-
+ "1,16\n" +
+ "2,6\n" +
+ "2,11\n" +
+ "3,15\n" +
+ "3,20\n" +
+ "4,12\n" +
+ "4,17\n" +
+ "5,13\n" +
+ "5,18";
+
compareResultAsTuples(result, expectedResult);
}
@SuppressWarnings("serial")
- private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumOutNeighbors implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
@@ -424,14 +422,14 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumInNeighbors implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
@@ -441,14 +439,14 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumAllNeighbors implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
-
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
@@ -458,57 +456,57 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumOutNeighborsIdGreaterThanThree implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
- sum += neighbor.f1.getValue();
+ sum += neighbor.f1.getValue();
}
- if(vertex.getId() > 3) {
+ if (vertex.getId() > 3) {
out.collect(new Tuple2<>(vertex.getId(), sum));
}
}
}
@SuppressWarnings("serial")
- private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumInNeighborsIdGreaterThanThree implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
}
- if(vertex.getId() > 3) {
+ if (vertex.getId() > 3) {
out.collect(new Tuple2<>(vertex.getId(), sum));
}
}
}
@SuppressWarnings("serial")
- private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumAllNeighborsIdGreaterThanThree implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
- if(vertex.getId() > 3) {
+ if (vertex.getId() > 3) {
out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
}
}
@@ -524,12 +522,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
+
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
@@ -541,12 +539,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements
+ NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
@@ -554,7 +552,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
next = neighbor;
sum += next.f2.getValue();
}
- if(next.f0 > 2) {
+ if (next.f0 > 2) {
out.collect(new Tuple2<>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum * 2));
}
@@ -562,12 +560,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements
+ NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
@@ -575,7 +573,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
next = neighbor;
sum += next.f2.getValue() * next.f1.getValue();
}
- if(next.f0 > 2) {
+ if (next.f0 > 2) {
out.collect(new Tuple2<>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum * 2));
}
@@ -583,12 +581,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements
+ NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
@@ -596,7 +594,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
next = neighbor;
sum += next.f2.getValue();
}
- if(next.f0 > 2) {
+ if (next.f0 > 2) {
out.collect(new Tuple2<>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum * 2));
}
@@ -604,13 +602,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumOutNeighborsMultipliedByTwo implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
@@ -622,13 +620,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumInNeighborsSubtractOne implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
@@ -640,13 +638,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
}
@SuppressWarnings("serial")
- private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long,
- Tuple2<Long, Long>> {
+ private static final class SumAllNeighborsAddFive implements
+ NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
@Override
public void iterateNeighbors(Vertex<Long, Long> vertex,
- Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
- Collector<Tuple2<Long, Long>> out) throws Exception {
+ Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+ Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {