You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/01 17:56:05 UTC
[4/6] flink git commit: [FLINK-2275] [tests] Aigrated test from
execute() to collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 0232464..4259b63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.javaApiOperators;
import java.util.Collection;
import java.util.Date;
+import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -37,12 +38,8 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -54,22 +51,6 @@ public class ReduceITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
/*
@@ -82,15 +63,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
groupBy(1).reduce(new Tuple3Reduce("B-)"));
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
- expected = "1,1,Hi\n" +
+ String expected = "1,1,Hi\n" +
"5,2,B-)\n" +
"15,3,B-)\n" +
"34,4,B-)\n" +
"65,5,B-)\n" +
"111,6,B-)\n";
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -105,10 +87,10 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
groupBy(4,0).reduce(new Tuple5Reduce());
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
+ .collect();
- expected = "1,1,0,Hallo,1\n" +
+ String expected = "1,1,0,Hallo,1\n" +
"2,3,2,Hallo Welt wie,1\n" +
"2,2,1,Hallo Welt,2\n" +
"3,9,0,P-),2\n" +
@@ -118,6 +100,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
"5,11,10,GHI,1\n" +
"5,29,0,P-),2\n" +
"5,25,0,P-),3\n";
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -132,15 +116,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)"));
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
- expected = "1,1,Hi\n" +
+ String expected = "1,1,Hi\n" +
"5,2,B-)\n" +
"15,3,B-)\n" +
"34,4,B-)\n" +
"65,5,B-)\n" +
"111,6,B-)\n";
+
+ compareResultAsTuples(result, expected);
}
public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
@@ -163,15 +148,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<CustomType> reduceDs = ds.
groupBy(new KeySelector2()).reduce(new CustomTypeReduce());
- reduceDs.writeAsText(resultPath);
- env.execute();
+ List<CustomType> result = reduceDs.collect();
- expected = "1,0,Hi\n" +
+ String expected = "1,0,Hi\n" +
"2,3,Hello!\n" +
"3,12,Hello!\n" +
"4,30,Hello!\n" +
"5,60,Hello!\n" +
"6,105,Hello!\n";
+
+ compareResultAsText(result, expected);
}
public static class KeySelector2 implements KeySelector<CustomType, Integer> {
@@ -194,10 +180,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
reduce(new AllAddingTuple3Reduce());
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
- expected = "231,91,Hello World\n";
+ String expected = "231,91,Hello World\n";
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -212,10 +199,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<CustomType> reduceDs = ds.
reduce(new AllAddingCustomTypeReduce());
- reduceDs.writeAsText(resultPath);
- env.execute();
+ List<CustomType> result = reduceDs.collect();
+
+ String expected = "91,210,Hello!";
- expected = "91,210,Hello!";
+ compareResultAsText(result, expected);
}
@Test
@@ -232,15 +220,16 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
- expected = "1,1,Hi\n" +
+ String expected = "1,1,Hi\n" +
"5,2,55\n" +
"15,3,55\n" +
"34,4,55\n" +
"65,5,55\n" +
"111,6,55\n";
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -255,10 +244,10 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds .
groupBy(new KeySelector3()).reduce(new Tuple5Reduce());
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
+ .collect();
- expected = "1,1,0,Hallo,1\n" +
+ String expected = "1,1,0,Hallo,1\n" +
"2,3,2,Hallo Welt wie,1\n" +
"2,2,1,Hallo Welt,2\n" +
"3,9,0,P-),2\n" +
@@ -268,6 +257,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
"5,11,10,GHI,1\n" +
"5,29,0,P-),2\n" +
"5,25,0,P-),3\n";
+
+ compareResultAsTuples(result, expected);
}
public static class KeySelector3 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
@@ -291,10 +282,10 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
groupBy("f4","f0").reduce(new Tuple5Reduce());
- reduceDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
+ .collect();
- expected = "1,1,0,Hallo,1\n" +
+ String expected = "1,1,0,Hallo,1\n" +
"2,3,2,Hallo Welt wie,1\n" +
"2,2,1,Hallo Welt,2\n" +
"3,9,0,P-),2\n" +
@@ -304,6 +295,8 @@ public class ReduceITCase extends MultipleProgramsTestBase {
"5,11,10,GHI,1\n" +
"5,29,0,P-),2\n" +
"5,25,0,P-),3\n";
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -317,9 +310,11 @@ public class ReduceITCase extends MultipleProgramsTestBase {
DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1());
- res.writeAsText(resultPath);
- env.execute();
- expected = "ok\nok";
+ List<String> result = res.collect();
+
+ String expected = "ok\nok";
+
+ compareResultAsText(result, expected);
}
public static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> {
@@ -369,20 +364,20 @@ public class ReduceITCase extends MultipleProgramsTestBase {
out.collect("ok");
}
}
-
+
public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
private final String f2Replace;
-
- public Tuple3Reduce() {
+
+ public Tuple3Reduce() {
this.f2Replace = null;
}
-
- public Tuple3Reduce(String f2Replace) {
+
+ public Tuple3Reduce(String f2Replace) {
this.f2Replace = f2Replace;
}
-
+
@Override
public Tuple3<Integer, Long, String> reduce(
@@ -397,41 +392,41 @@ public class ReduceITCase extends MultipleProgramsTestBase {
return out;
}
}
-
+
public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-
+
@Override
public Tuple5<Integer, Long, Integer, String, Long> reduce(
Tuple5<Integer, Long, Integer, String, Long> in1,
Tuple5<Integer, Long, Integer, String, Long> in2)
- throws Exception {
-
+ throws Exception {
+
out.setFields(in1.f0, in1.f1+in2.f1, 0, "P-)", in1.f4);
return out;
}
}
-
+
public static class CustomTypeReduce implements ReduceFunction<CustomType> {
private static final long serialVersionUID = 1L;
private final CustomType out = new CustomType();
-
+
@Override
public CustomType reduce(CustomType in1, CustomType in2)
throws Exception {
-
+
out.myInt = in1.myInt;
out.myLong = in1.myLong + in2.myLong;
out.myString = "Hello!";
return out;
}
}
-
+
public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-
+
@Override
public Tuple3<Integer, Long, String> reduce(
Tuple3<Integer, Long, String> in1,
@@ -441,37 +436,37 @@ public class ReduceITCase extends MultipleProgramsTestBase {
return out;
}
}
-
+
public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> {
private static final long serialVersionUID = 1L;
private final CustomType out = new CustomType();
-
+
@Override
public CustomType reduce(CustomType in1, CustomType in2)
throws Exception {
-
+
out.myInt = in1.myInt + in2.myInt;
out.myLong = in1.myLong + in2.myLong;
out.myString = "Hello!";
return out;
}
}
-
+
public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
private String f2Replace = "";
-
+
@Override
public void open(Configuration config) {
-
+
Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
int sum = 0;
for(Integer i : ints) {
sum += i;
}
f2Replace = sum+"";
-
+
}
@Override
@@ -483,5 +478,5 @@ public class ReduceITCase extends MultipleProgramsTestBase {
return out;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
index c7ca37d..8cc54aa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
@@ -19,6 +19,8 @@
package org.apache.flink.test.javaApiOperators;
+import java.util.List;
+
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ReplicatingInputFormat;
@@ -32,11 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.NumberSequenceIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -51,23 +49,6 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
-
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath); // 500500 = 0+1+2+3+...+999+1000
- }
-
@Test
public void testReplicatedSourceToJoin() throws Exception {
/*
@@ -85,11 +66,11 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
.projectFirst(0)
.sum(0);
- pairs.writeAsText(resultPath);
- env.execute();
+ List<Tuple> result = pairs.collect();
- expectedResult = "(500500)";
+ String expectedResult = "(500500)";
+ compareResultAsText(result, expectedResult);
}
@Test
@@ -120,11 +101,11 @@ public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
})
.sum(0);
- pairs.writeAsText(resultPath);
- env.execute();
+ List<Tuple1<Long>> result = pairs.collect();
- expectedResult = "(500500)";
+ String expectedResult = "(500500)";
+ compareResultAsText(result, expectedResult);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
index d961f3a..2b7226b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
@@ -31,16 +31,13 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.Serializable;
import java.util.Iterator;
+import java.util.List;
@RunWith(Parameterized.class)
public class SortPartitionITCase extends MultipleProgramsTestBase {
@@ -49,22 +46,6 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testSortPartitionByKeyField() throws Exception {
/*
@@ -75,16 +56,15 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- ds
+ List result = ds
.map(new IdMapper()).setParallelism(4) // parallelize input
.sortPartition(1, Order.DESCENDING)
.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
@Test
@@ -97,19 +77,19 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(2);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- ds
+ List result = ds
.map(new IdMapper()).setParallelism(2) // parallelize input
.sortPartition(4, Order.ASCENDING)
.sortPartition(2, Order.DESCENDING)
.mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testSortPartitionByFieldExpression() throws Exception {
/*
@@ -120,16 +100,15 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(4);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- ds
+ List result = ds
.map(new IdMapper()).setParallelism(4) // parallelize input
.sortPartition("f1", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
@Test
@@ -142,17 +121,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(2);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- ds
+ List result = ds
.map(new IdMapper()).setParallelism(2) // parallelize input
.sortPartition("f4", Order.ASCENDING)
.sortPartition("f2", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
@Test
@@ -165,17 +143,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(3);
DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
- ds
+ List result = ds
.map(new IdMapper()).setParallelism(3) // parallelize input
.sortPartition("f0.f1", Order.ASCENDING)
.sortPartition("f1", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new NestedTupleChecker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
@Test
@@ -188,17 +165,16 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(3);
DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
- ds
+ List result = ds
.map(new IdMapper()).setParallelism(1) // parallelize input
.sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
.sortPartition("number", Order.DESCENDING)
.mapPartition(new OrderCheckMapper<POJO>(new PojoChecker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
@Test
@@ -211,15 +187,14 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
env.setParallelism(3);
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- ds
+ List result = ds
.sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism
.mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker()))
- .distinct()
- .writeAsText(resultPath);
+ .distinct().collect();
- env.execute();
+ String expected = "(true)\n";
- expected = "(true)\n";
+ compareResultAsText(result, expected);
}
public static interface OrderChecker<T> extends Serializable {
@@ -237,7 +212,7 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
public static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
@Override
public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1,
- Tuple5<Integer, Long, Integer, String, Long> t2) {
+ Tuple5<Integer, Long, Integer, String, Long> t2) {
return t1.f4 < t2.f4 || t1.f4 == t2.f4 && t1.f2 >= t2.f2;
}
}
@@ -245,19 +220,18 @@ public class SortPartitionITCase extends MultipleProgramsTestBase {
public static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
@Override
public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1,
- Tuple2<Tuple2<Integer, Integer>, String> t2) {
+ Tuple2<Tuple2<Integer, Integer>, String> t2) {
return t1.f0.f1 < t2.f0.f1 ||
t1.f0.f1 == t2.f0.f1 && t1.f1.compareTo(t2.f1) >= 0;
- }
+ }
}
public static class PojoChecker implements OrderChecker<POJO> {
@Override
- public boolean inOrder(POJO t1,
- POJO t2) {
+ public boolean inOrder(POJO t1, POJO t2) {
return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) < 0 ||
t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) == 0 &&
- t1.number >= t2.number;
+ t1.number >= t2.number;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
index e6367c3..e5bdc19 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.test.javaApiOperators;
+import java.util.List;
+
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
@@ -25,11 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -40,22 +38,6 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testSumMaxAndProject() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -66,10 +48,11 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
.andMax(1)
.project(0, 1);
- sumDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Integer, Long>> result = sumDs.collect();
+
+ String expected = "231,6\n";
- expected = "231,6\n";
+ compareResultAsTuples(result, expected);
}
@Test
@@ -85,15 +68,16 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
.sum(0)
.project(1, 0);
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple2<Long, Integer>> result = aggregateDs.collect();
- expected = "1,1\n" +
+ String expected = "1,1\n" +
"2,5\n" +
"3,15\n" +
"4,34\n" +
"5,65\n" +
"6,111\n";
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -110,9 +94,10 @@ public class SumMinMaxITCase extends MultipleProgramsTestBase {
.min(0)
.project(0);
- aggregateDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple1<Integer>> result = aggregateDs.collect();
+
+ String expected = "1\n";
- expected = "1\n";
+ compareResultAsTuples(result, expected);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
index 350227a..a2c10bc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -44,25 +45,14 @@ public class TypeHintITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 3;
private int curProgId = config.getInteger("ProgramId", -1);
- private String resultPath;
- private String expectedResult;
public TypeHintITCase(Configuration config) {
super(config);
}
@Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- }
- @Override
protected void testProgram() throws Exception {
- expectedResult = TypeHintProgs.runProgram(curProgId, resultPath);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+ TypeHintProgs.runProgram(curProgId);
}
@Parameters
@@ -81,7 +71,7 @@ public class TypeHintITCase extends JavaProgramTestBase {
private static class TypeHintProgs {
- public static String runProgram(int progId, String resultPath) throws Exception {
+ public static void runProgram(int progId) throws Exception {
switch(progId) {
// Test identity map with missing types and string type hint
case 1: {
@@ -91,13 +81,14 @@ public class TypeHintITCase extends JavaProgramTestBase {
DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
.returns("Tuple3<Integer, Long, String>");
- identityMapDs.writeAsText(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
- // return expected result
- return "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
+ String expectedResult = "(2,2,Hello)\n" +
+ "(3,2,Hello world)\n" +
+ "(1,1,Hi)\n";
+
+ compareResultAsText(result, expectedResult);
+ break;
}
// Test identity map with missing types and type information type hint
case 2: {
@@ -108,32 +99,34 @@ public class TypeHintITCase extends JavaProgramTestBase {
// all following generics get erased during compilation
.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
.returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
- identityMapDs.writeAsText(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = identityMapDs
+ .collect();
+
+ String expectedResult = "(2,2,Hello)\n" +
+ "(3,2,Hello world)\n" +
+ "(1,1,Hi)\n";
- // return expected result
- return "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
+ compareResultAsText(result, expectedResult);
+ break;
}
// Test flat map with class type hint
case 3: {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- @SuppressWarnings({ "rawtypes", "unchecked" })
DataSet<Integer> identityMapDs = ds.
flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
- .returns((Class) Integer.class);
- identityMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "2\n" +
- "3\n" +
- "1\n";
+ .returns(Integer.class);
+ List<Integer> result = identityMapDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ break;
}
- default:
+ default:
throw new IllegalArgumentException("Invalid program id");
}
}
@@ -150,7 +143,7 @@ public class TypeHintITCase extends JavaProgramTestBase {
return (V) value;
}
}
-
+
public static class FlatMapper<T, V> implements FlatMapFunction<T, V> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 2e7ae9c..7ab2764 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -18,15 +18,13 @@
package org.apache.flink.test.javaApiOperators;
+import java.util.List;
+
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.flink.api.java.DataSet;
@@ -61,22 +59,6 @@ public class UnionITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String resultPath;
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
@Test
public void testUnion2IdenticalDataSets() throws Exception {
/*
@@ -87,10 +69,11 @@ public class UnionITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
- unionDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = unionDs.collect();
+
+ String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
- expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
+ compareResultAsTuples(result, expected);
}
@Test
@@ -107,11 +90,13 @@ public class UnionITCase extends MultipleProgramsTestBase {
.union(CollectionDataSets.get3TupleDataSet(env))
.union(CollectionDataSets.get3TupleDataSet(env));
- unionDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = unionDs.collect();
- expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
+ String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
+ + FULL_TUPLE_3_STRING +
FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
+
+ compareResultAsTuples(result, expected);
}
@Test
@@ -128,10 +113,11 @@ public class UnionITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
.union(empty);
- unionDs.writeAsCsv(resultPath);
- env.execute();
+ List<Tuple3<Integer, Long, String>> result = unionDs.collect();
- expected = FULL_TUPLE_3_STRING;
+ String expected = FULL_TUPLE_3_STRING;
+
+ compareResultAsTuples(result, expected);
}
public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
@@ -142,5 +128,5 @@ public class UnionITCase extends MultipleProgramsTestBase {
return false;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a137321a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index a68fd82..1faf4c1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
@@ -313,6 +314,19 @@ public class CollectionDataSets {
}
}
+ public static class CustomTypeComparator implements Comparator<CustomType> {
+ @Override
+ public int compare(CustomType o1, CustomType o2) {
+ int diff = o1.myInt - o2.myInt;
+ if (diff != 0) {
+ return diff;
+ }
+ diff = (int) (o1.myLong - o2.myLong);
+ return diff != 0 ? diff : o1.myString.compareTo(o2.myString);
+ }
+
+ }
+
public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<Tuple7<Integer, String, Integer, Integer, Long, String, Long>>();
data.add(new Tuple7<Integer, String, Integer, Integer, Long, String, Long>(1, "First", 10, 100, 1000L, "One", 10000L));