You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/06/25 23:25:40 UTC
[2/5] flink git commit: [FLINK-2264] [gelly] changed the tests to use
collect instead of temp files
http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
index 120e97a..99f893e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -18,7 +18,10 @@
package org.apache.flink.graph.test.operations;
+import java.util.List;
+
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
@@ -27,11 +30,7 @@ import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -42,22 +41,8 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
@Test
public void testJoinWithVertexSet() throws Exception {
/*
@@ -69,17 +54,19 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices()
+ Graph<Long, Long, Long> res = graph.joinWithVertices(graph.getVertices()
.map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
+ DataSet<Vertex<Long,Long>> data = res.getVertices();
+ List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,2\n" +
"2,4\n" +
"3,6\n" +
"4,8\n" +
"5,10\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -93,17 +80,19 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
+ Graph<Long, Long, Long> res = graph.joinWithVertices(graph.getVertices().first(3)
.map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
+ DataSet<Vertex<Long,Long>> data = res.getVertices();
+ List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,2\n" +
"2,4\n" +
"3,6\n" +
"4,4\n" +
"5,5\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -117,17 +106,19 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
+ Graph<Long, Long, Long> res = graph.joinWithVertices(graph.getVertices().first(3)
.map(new ProjectIdWithTrue()), new DoubleIfTrueMapper());
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
+ DataSet<Vertex<Long,Long>> data = res.getVertices();
+ List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,2\n" +
"2,4\n" +
"3,6\n" +
"4,4\n" +
"5,5\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -141,17 +132,19 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
+ Graph<Long, Long, Long> res = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
new ProjectSecondMapper());
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
+ DataSet<Vertex<Long,Long>> data = res.getVertices();
+ List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,10\n" +
"2,20\n" +
"3,30\n" +
"4,40\n" +
"5,5\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -164,17 +157,19 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
+ Graph<Long, Long, Long> res = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
new CustomValueMapper());
- result.getVertices().writeAsCsv(resultPath);
- env.execute();
+ DataSet<Vertex<Long,Long>> data = res.getVertices();
+ List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,10\n" +
"2,20\n" +
"3,30\n" +
"4,40\n" +
"5,5\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
index d1ba9a5..b3e9b11 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.test.operations;
+import java.util.List;
+
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,11 +30,7 @@ import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -43,22 +41,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
@Test
public void testWithSameValue() throws Exception {
/*
@@ -70,9 +54,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
+ List<Edge<Long, Long>> result= mappedEdges.collect();
+
expectedResult = "1,2,13\n" +
"1,3,14\n" +
"2,3,24\n" +
@@ -80,6 +63,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
"3,5,36\n" +
"4,5,46\n" +
"5,1,52\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -93,9 +78,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
+ List<Edge<Long, String>> result= mappedEdges.collect();
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
expectedResult = "1,2,string(12)\n" +
"1,3,string(13)\n" +
"2,3,string(23)\n" +
@@ -103,6 +87,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
"3,5,string(35)\n" +
"4,5,string(45)\n" +
"5,1,string(51)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -116,9 +102,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
+ List<Edge<Long, Tuple1<Long>>> result= mappedEdges.collect();
expectedResult = "1,2,(12)\n" +
"1,3,(13)\n" +
@@ -127,6 +111,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
"3,5,(35)\n" +
"4,5,(45)\n" +
"5,1,(51)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -140,9 +126,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
+ List<Edge<Long, DummyCustomType>> result= mappedEdges.collect();
expectedResult = "1,2,(T,12)\n" +
"1,3,(T,13)\n" +
@@ -151,6 +135,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
"3,5,(T,35)\n" +
"4,5,(T,45)\n" +
"5,1,(T,51)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -165,9 +151,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
new ToCustomParametrizedTypeMapper()).getEdges();
-
- mappedEdges.writeAsCsv(resultPath);
- env.execute();
+ List<Edge<Long, DummyCustomParameterizedType<Double>>> result= mappedEdges.collect();
expectedResult = "1,2,(12.0,12)\n" +
"1,3,(13.0,13)\n" +
@@ -176,6 +160,8 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
"3,5,(35.0,35)\n" +
"4,5,(45.0,45)\n" +
"5,1,(51.0,51)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
index f3a63be..0c63b4e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.test.operations;
+import java.util.List;
+
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,11 +30,7 @@ import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -43,22 +41,8 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
@Test
public void testWithSameValue() throws Exception {
/*
@@ -69,15 +53,16 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
- DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
+ DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
+ List<Vertex<Long, Long>> result= mappedVertices.collect();
+
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -91,15 +76,15 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
+ List<Vertex<Long, String>> result= mappedVertices.collect();
expectedResult = "1,one\n" +
"2,two\n" +
"3,three\n" +
"4,four\n" +
"5,five\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -113,15 +98,15 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
+ List<Vertex<Long, Tuple1<Long>>> result= mappedVertices.collect();
expectedResult = "1,(1)\n" +
"2,(2)\n" +
"3,(3)\n" +
"4,(4)\n" +
"5,(5)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -135,15 +120,15 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
+ List<Vertex<Long, DummyCustomType>> result= mappedVertices.collect();
expectedResult = "1,(T,1)\n" +
"2,(T,2)\n" +
"3,(T,3)\n" +
"4,(T,4)\n" +
"5,(T,5)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -158,15 +143,15 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
new ToCustomParametrizedTypeMapper()).getVertices();
-
- mappedVertices.writeAsCsv(resultPath);
- env.execute();
+ List<Vertex<Long, DummyCustomParameterizedType<Double>>> result= mappedVertices.collect();
expectedResult = "1,(1.0,1)\n" +
"2,(2.0,2)\n" +
"3,(3.0,3)\n" +
"4,(4.0,4)\n" +
"5,(5.0,5)\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index b03268c..c7aa45c 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.test.operations;
+import java.util.List;
+
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,11 +33,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -46,22 +44,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
@Test
public void testLowestWeightOutNeighbor() throws Exception {
/*
@@ -74,14 +58,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
+
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,1\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -96,14 +82,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
expectedResult = "1,5\n" +
"2,1\n" +
"3,1\n" +
"4,3\n" +
"5,3\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -117,8 +104,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT);
- verticesWithAllOutNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllOutNeighbors.collect();
expectedResult = "1,2\n" +
"1,3\n" +
@@ -127,6 +113,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
"3,5\n" +
"4,5\n" +
"5,1";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -140,8 +128,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT);
- verticesWithAllOutNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllOutNeighbors.collect();
expectedResult = "1,2\n" +
"1,3\n" +
@@ -149,6 +136,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
"3,4\n" +
"3,5\n" +
"4,5";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -162,13 +151,14 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT);
- verticesWithAllOutNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllOutNeighbors.collect();
expectedResult = "3,4\n" +
"3,5\n" +
"4,5\n" +
"5,1";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -182,8 +172,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN);
- verticesWithAllInNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "1,5\n" +
"2,1\n" +
@@ -192,6 +181,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
"4,3\n" +
"5,3\n" +
"5,4";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -205,14 +196,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN);
- verticesWithAllInNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "1,5\n" +
"2,1\n" +
"3,1\n" +
"3,2\n" +
"4,3";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -226,14 +218,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
- verticesWithAllInNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllInNeighbors.collect();
expectedResult = "3,1\n" +
"3,2\n" +
"4,3\n" +
"5,3\n" +
"5,4";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -247,8 +240,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL);
- verticesWithAllNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllNeighbors.collect();
expectedResult = "1,2\n" +
"1,3\n" +
@@ -264,6 +256,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
"5,1\n" +
"5,3\n" +
"5,4";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -277,8 +271,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL);
- verticesWithAllNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllNeighbors.collect();
expectedResult = "1,2\n" +
"1,3\n" +
@@ -289,6 +282,8 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
"3,5\n" +
"4,3\n" +
"4,5";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -302,12 +297,13 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
- verticesWithAllNeighbors.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithAllNeighbors.collect();
expectedResult = "5,1\n" +
"5,3\n" +
"5,4";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -322,14 +318,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
- verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithMaxEdgeWeight.collect();
expectedResult = "1,51\n" +
"2,23\n" +
"3,35\n" +
"4,45\n" +
"5,51\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -344,14 +341,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
expectedResult = "1,12\n" +
"2,23\n" +
"3,34\n" +
"4,45\n" +
"5,51\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -366,14 +364,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor =
graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN);
- verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithLowestOutNeighbor.collect();
expectedResult = "1,51\n" +
"2,12\n" +
"3,13\n" +
"4,34\n" +
"5,35\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -388,14 +387,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight =
graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
- verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithMaxEdgeWeight.collect();
expectedResult = "1,51\n" +
"2,23\n" +
"3,35\n" +
"4,45\n" +
"5,51\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/872c3fc4/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index a9fb06e..f25391e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.test.operations;
import java.util.Iterator;
+import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -34,11 +35,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -49,22 +46,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
@Test
public void testSumOfOutNeighbors() throws Exception {
/*
@@ -77,14 +60,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
expectedResult = "1,5\n" +
"2,3\n" +
"3,9\n" +
"4,5\n" +
"5,1\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -99,14 +83,17 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
-
- verticesWithSum.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+
expectedResult = "1,255\n" +
"2,12\n" +
"3,59\n" +
"4,102\n" +
"5,285\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
+
+
}
@Test
@@ -122,15 +109,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,11\n" +
"2,6\n" +
"3,15\n" +
"4,12\n" +
"5,13\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -145,11 +132,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
expectedResult = "4,5\n" +
"5,1\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -164,11 +152,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
-
- verticesWithSum.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+
expectedResult = "4,102\n" +
"5,285\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -184,12 +173,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "4,12\n" +
"5,13\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -204,15 +193,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,5\n" +
"2,3\n" +
"3,9\n" +
"4,5\n" +
"5,1\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -227,14 +216,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
- verticesWithSum.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSum.collect();
expectedResult = "1,255\n" +
"2,12\n" +
"3,59\n" +
"4,102\n" +
"5,285\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -249,15 +239,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL);
-
- verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
expectedResult = "1,10\n" +
"2,4\n" +
"3,12\n" +
"4,8\n" +
"5,8\n";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -272,9 +262,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "3,9\n" +
"3,18\n" +
@@ -282,6 +270,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"4,10\n" +
"5,1\n" +
"5,2";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -296,8 +286,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "3,59\n" +
"3,118\n" +
@@ -305,6 +294,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"4,102\n" +
"5,570\n" +
"5,285";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -319,9 +310,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
-
- verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfAllNeighborValues.collect();
expectedResult = "3,12\n" +
"3,24\n" +
@@ -329,6 +318,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"4,16\n" +
"5,8\n" +
"5,16";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -343,9 +334,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
+
expectedResult = "1,5\n" +
"1,10\n" +
"2,3\n" +
@@ -356,6 +346,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"4,10\n" +
"5,1\n" +
"5,2";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -370,9 +362,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSum =
graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
-
- verticesWithSum.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSum.collect();
+
expectedResult = "1,255\n" +
"1,254\n" +
"2,12\n" +
@@ -383,6 +374,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"4,101\n" +
"5,285\n" +
"5,284";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@Test
@@ -398,9 +391,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
-
- verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long,Long>> result = verticesWithSumOfOutNeighborValues.collect();
expectedResult = "1,11\n" +
"1,16\n" +
@@ -412,6 +403,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
"4,17\n" +
"5,13\n" +
"5,18";
+
+ CompareResults.compareResultAsTuples(result, expectedResult);
}
@SuppressWarnings("serial")