You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 11:59:10 UTC
[03/12] flink git commit: [FLINK-7192] [java] Activate checkstyle
flink-java/test/operator
[FLINK-7192] [java] Activate checkstyle flink-java/test/operator
This closes #4335.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca5d8afe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca5d8afe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca5d8afe
Branch: refs/heads/master
Commit: ca5d8afee8321e0dff063e8404538b132e979739
Parents: 80468b1
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:41:50 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 12:11:11 2017 +0200
----------------------------------------------------------------------
.../java/operator/AggregateOperatorTest.java | 56 +-
.../api/java/operator/CoGroupOperatorTest.java | 179 ++---
.../api/java/operator/CrossOperatorTest.java | 105 +--
.../flink/api/java/operator/DataSinkTest.java | 15 +-
.../api/java/operator/DistinctOperatorTest.java | 93 +--
.../api/java/operator/FirstNOperatorTest.java | 90 +--
.../operator/FullOuterJoinOperatorTest.java | 12 +-
.../java/operator/GroupCombineOperatorTest.java | 36 +-
.../java/operator/GroupReduceOperatorTest.java | 36 +-
.../flink/api/java/operator/GroupingTest.java | 145 ++--
.../api/java/operator/JoinOperatorTest.java | 447 +++++------
.../operator/LeftOuterJoinOperatorTest.java | 17 +-
.../api/java/operator/MaxByOperatorTest.java | 51 +-
.../api/java/operator/MinByOperatorTest.java | 54 +-
.../flink/api/java/operator/OperatorTest.java | 4 +
.../java/operator/PartitionOperatorTest.java | 746 ++++++++++---------
.../java/operator/ProjectionOperatorTest.java | 100 +--
.../api/java/operator/ReduceOperatorTest.java | 28 +-
.../operator/RightOuterJoinOperatorTest.java | 12 +-
.../api/java/operator/SortPartitionTest.java | 38 +-
20 files changed, 1179 insertions(+), 1085 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
index d9678eb..7977b68 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/AggregateOperatorTest.java
@@ -18,28 +18,32 @@
package org.apache.flink.api.java.operator;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#aggregate(Aggregations, int)}.
+ */
public class AggregateOperatorTest {
// TUPLE DATA
-
- private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -47,59 +51,59 @@ public class AggregateOperatorTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
// LONG DATA
-
+
private final List<Long> emptyLongData = new ArrayList<Long>();
-
+
@Test
public void testFieldsAggregate() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.aggregate(Aggregations.SUM, 1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work: index out of bounds
try {
tupleDs.aggregate(Aggregations.SUM, 10);
Assert.fail();
- } catch(IllegalArgumentException iae) {
+ } catch (IllegalArgumentException iae) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work: not applied to tuple dataset
DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
try {
longDs.aggregate(Aggregations.MIN, 1);
Assert.fail();
- } catch(InvalidProgramException uoe) {
+ } catch (InvalidProgramException uoe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
}
-
+
@Test
public void testAggregationTypes() {
try {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
-
+
// should work: multiple aggregates
tupleDs.aggregate(Aggregations.SUM, 0).and(Aggregations.MIN, 4);
// should work: nested aggregates
tupleDs.aggregate(Aggregations.MIN, 2).aggregate(Aggregations.SUM, 1);
-
+
// should not work: average on string
try {
tupleDs.aggregate(Aggregations.SUM, 2);
@@ -108,7 +112,7 @@ public class AggregateOperatorTest {
// we're good here
}
}
- catch(Exception e) {
+ catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 90a65e6..d8b5f00 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -41,14 +42,17 @@ import java.util.List;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link DataSet#coGroup(DataSet)}.
+ */
@SuppressWarnings("serial")
public class CoGroupOperatorTest {
// TUPLE DATA
- private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -56,17 +60,17 @@ public class CoGroupOperatorTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
private static List<CustomType> customTypeData = new ArrayList<CustomType>();
-
+
@BeforeClass
public static void insertCustomData() {
customTypeData.add(new CustomType());
}
-
- @Test
+
+ @Test
public void testCoGroupKeyFields1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -74,14 +78,14 @@ public class CoGroupOperatorTest {
// should work
try {
ds1.coGroup(ds2).where(0).equalTo(0);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyFields2() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -89,21 +93,21 @@ public class CoGroupOperatorTest {
// should not work, incompatible cogroup key types
ds1.coGroup(ds2).where(0).equalTo(2);
}
-
+
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyFields3() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should not work, incompatible number of cogroup keys
- ds1.coGroup(ds2).where(0,1).equalTo(2);
+ ds1.coGroup(ds2).where(0, 1).equalTo(2);
}
-
+
@Test(expected = IndexOutOfBoundsException.class)
public void testCoGroupKeyFields4() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -111,10 +115,10 @@ public class CoGroupOperatorTest {
// should not work, cogroup key out of range
ds1.coGroup(ds2).where(5).equalTo(0);
}
-
+
@Test(expected = IndexOutOfBoundsException.class)
public void testCoGroupKeyFields5() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -122,10 +126,10 @@ public class CoGroupOperatorTest {
// should not work, negative key field position
ds1.coGroup(ds2).where(-1).equalTo(-1);
}
-
+
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyFields6() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -144,7 +148,7 @@ public class CoGroupOperatorTest {
// should work
try {
ds1.coGroup(ds2).where("myInt").equalTo("myInt");
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -253,7 +257,7 @@ public class CoGroupOperatorTest {
ds1.coGroup(ds2).where("*").equalTo("*");
}
-
+
@Test
public void testCoGroupKeyExpressions1Nested() {
@@ -264,7 +268,7 @@ public class CoGroupOperatorTest {
// should work
try {
ds1.coGroup(ds2).where("nested.myInt").equalTo("nested.myInt");
- } catch(Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
@@ -302,10 +306,10 @@ public class CoGroupOperatorTest {
// should not work, cogroup key non-existent
ds1.coGroup(ds2).where("nested.myNonExistent").equalTo("nested.myInt");
}
-
+
@Test
public void testCoGroupKeySelectors1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -315,7 +319,7 @@ public class CoGroupOperatorTest {
ds1.coGroup(ds2)
.where(
new KeySelector<CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
@@ -324,32 +328,31 @@ public class CoGroupOperatorTest {
)
.equalTo(
new KeySelector<CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
}
}
);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCoGroupKeyMixing1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
-
// should work
try {
ds1.coGroup(ds2)
.where(
new KeySelector<CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
@@ -357,14 +360,14 @@ public class CoGroupOperatorTest {
}
)
.equalTo(3);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCoGroupKeyMixing2() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -375,21 +378,21 @@ public class CoGroupOperatorTest {
.where(3)
.equalTo(
new KeySelector<CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
}
}
);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyMixing3() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
@@ -399,7 +402,7 @@ public class CoGroupOperatorTest {
.where(2)
.equalTo(
new KeySelector<CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
@@ -407,20 +410,20 @@ public class CoGroupOperatorTest {
}
);
}
-
+
@Test(expected = InvalidProgramException.class)
public void testCoGroupKeyMixing4() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
// should not work, more than one key field position
ds1.coGroup(ds2)
- .where(1,3)
+ .where(1, 3)
.equalTo(
new KeySelector<CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
@@ -436,32 +439,32 @@ public class CoGroupOperatorTest {
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
- CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+ CoGroupOperator<?, ?, ?> coGroupOp = tupleDs1.coGroup(tupleDs2)
.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
.with(new DummyTestCoGroupFunction1());
SemanticProperties semProps = coGroupOp.getSemanticProperties();
assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,2).size() == 1);
- assertTrue(semProps.getForwardingTargetFields(0,2).contains(4));
- assertTrue(semProps.getForwardingTargetFields(0,3).size() == 2);
- assertTrue(semProps.getForwardingTargetFields(0,3).contains(1));
- assertTrue(semProps.getForwardingTargetFields(0,3).contains(3));
- assertTrue(semProps.getForwardingTargetFields(0,4).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,6).size() == 0);
-
- assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,2).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,3).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,4).size() == 1);
- assertTrue(semProps.getForwardingTargetFields(1,4).contains(2));
- assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,6).size() == 1);
- assertTrue(semProps.getForwardingTargetFields(1,6).contains(0));
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);
+
+ assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1, 6).contains(0));
assertTrue(semProps.getReadFields(0).size() == 3);
assertTrue(semProps.getReadFields(0).contains(2));
@@ -480,7 +483,7 @@ public class CoGroupOperatorTest {
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
- CoGroupOperator<?,?,?> coGroupOp = tupleDs1.coGroup(tupleDs2)
+ CoGroupOperator<?, ?, ?> coGroupOp = tupleDs1.coGroup(tupleDs2)
.where(new DummyTestKeySelector()).equalTo(new DummyTestKeySelector())
.with(new DummyTestCoGroupFunction2())
.withForwardedFieldsFirst("2;4->0")
@@ -488,26 +491,26 @@ public class CoGroupOperatorTest {
SemanticProperties semProps = coGroupOp.getSemanticProperties();
- assertTrue(semProps.getForwardingTargetFields(0,0).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,1).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,2).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,3).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,4).size() == 1);
- assertTrue(semProps.getForwardingTargetFields(0,4).contains(2));
- assertTrue(semProps.getForwardingTargetFields(0,5).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(0,6).size() == 1);
- assertTrue(semProps.getForwardingTargetFields(0,6).contains(0));
-
- assertTrue(semProps.getForwardingTargetFields(1,0).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,1).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,2).size() == 1);
- assertTrue(semProps.getForwardingTargetFields(1,2).contains(4));
- assertTrue(semProps.getForwardingTargetFields(1,3).size() == 2);
- assertTrue(semProps.getForwardingTargetFields(1,3).contains(1));
- assertTrue(semProps.getForwardingTargetFields(1,3).contains(3));
- assertTrue(semProps.getForwardingTargetFields(1,4).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,5).size() == 0);
- assertTrue(semProps.getForwardingTargetFields(1,6).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
+ assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));
+
+ assertTrue(semProps.getForwardingTargetFields(1, 0).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 1).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 2).size() == 1);
+ assertTrue(semProps.getForwardingTargetFields(1, 2).contains(4));
+ assertTrue(semProps.getForwardingTargetFields(1, 3).size() == 2);
+ assertTrue(semProps.getForwardingTargetFields(1, 3).contains(1));
+ assertTrue(semProps.getForwardingTargetFields(1, 3).contains(3));
+ assertTrue(semProps.getForwardingTargetFields(1, 4).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 5).size() == 0);
+ assertTrue(semProps.getForwardingTargetFields(1, 6).size() == 0);
assertTrue(semProps.getReadFields(0).size() == 3);
assertTrue(semProps.getReadFields(0).contains(2));
@@ -517,7 +520,7 @@ public class CoGroupOperatorTest {
assertTrue(semProps.getReadFields(1) == null);
}
- public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ private static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
@Override
public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
return new Tuple2<Long, Integer>();
@@ -528,7 +531,7 @@ public class CoGroupOperatorTest {
@FunctionAnnotation.ForwardedFieldsSecond("2;4->0")
@FunctionAnnotation.ReadFieldsFirst("0;2;4")
@FunctionAnnotation.ReadFieldsSecond("1;3")
- public static class DummyTestCoGroupFunction1
+ private static class DummyTestCoGroupFunction1
implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
Tuple5<Integer, Long, String, Long, Integer>,
Tuple5<Integer, Long, String, Long, Integer>> {
@@ -541,7 +544,7 @@ public class CoGroupOperatorTest {
}
@FunctionAnnotation.ReadFieldsFirst("0;1;2")
- public static class DummyTestCoGroupFunction2
+ private static class DummyTestCoGroupFunction2
implements CoGroupFunction<Tuple5<Integer, Long, String, Long, Integer>,
Tuple5<Integer, Long, String, Long, Integer>,
Tuple5<Integer, Long, String, Long, Integer>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
index 59d2d61..22b03fc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -31,6 +32,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Tests for {@link DataSet#cross(DataSet)}.
+ */
public class CrossOperatorTest {
// TUPLE DATA
@@ -64,11 +68,11 @@ public class CrossOperatorTest {
try {
ds1.cross(ds2)
.projectFirst(0);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection21() {
@@ -80,7 +84,7 @@ public class CrossOperatorTest {
try {
ds1.cross(ds2)
.projectFirst(0);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -95,12 +99,12 @@ public class CrossOperatorTest {
// should work
try {
ds1.cross(ds2)
- .projectFirst(0,3);
- } catch(Exception e) {
+ .projectFirst(0, 3);
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection22() {
@@ -111,8 +115,8 @@ public class CrossOperatorTest {
// should work
try {
ds1.cross(ds2)
- .projectFirst(0,3);
- } catch(Exception e) {
+ .projectFirst(0, 3);
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -129,11 +133,11 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectFirst(0)
.projectSecond(3);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection23() {
@@ -146,7 +150,7 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectFirst(0)
.projectSecond(3);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -161,15 +165,15 @@ public class CrossOperatorTest {
// should work
try {
ds1.cross(ds2)
- .projectFirst(0,2)
- .projectSecond(1,4)
+ .projectFirst(0, 2)
+ .projectSecond(1, 4)
.projectFirst(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection24() {
@@ -180,10 +184,10 @@ public class CrossOperatorTest {
// should work
try {
ds1.cross(ds2)
- .projectFirst(0,2)
- .projectSecond(1,4)
+ .projectFirst(0, 2)
+ .projectSecond(1, 4)
.projectFirst(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
@@ -199,14 +203,14 @@ public class CrossOperatorTest {
// should work
try {
ds1.cross(ds2)
- .projectSecond(0,2)
- .projectFirst(1,4)
+ .projectSecond(0, 2)
+ .projectFirst(1, 4)
.projectFirst(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection25() {
@@ -217,10 +221,10 @@ public class CrossOperatorTest {
// should work
try {
ds1.cross(ds2)
- .projectSecond(0,2)
- .projectFirst(1,4)
+ .projectSecond(0, 2)
+ .projectFirst(1, 4)
.projectFirst(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -237,11 +241,11 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectFirst()
.projectSecond();
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection26() {
@@ -254,7 +258,7 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectFirst()
.projectSecond();
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -270,12 +274,12 @@ public class CrossOperatorTest {
try {
ds1.cross(ds2)
.projectSecond()
- .projectFirst(1,4);
- } catch(Exception e) {
+ .projectFirst(1, 4);
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testCrossProjection27() {
@@ -287,13 +291,13 @@ public class CrossOperatorTest {
try {
ds1.cross(ds2)
.projectSecond()
- .projectFirst(1,4);
- } catch(Exception e) {
+ .projectFirst(1, 4);
+ } catch (Exception e) {
Assert.fail();
}
}
- @Test(expected=IndexOutOfBoundsException.class)
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection8() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -304,8 +308,8 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectFirst(5);
}
-
- @Test(expected=IndexOutOfBoundsException.class)
+
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection28() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -317,7 +321,7 @@ public class CrossOperatorTest {
.projectFirst(5);
}
- @Test(expected=IndexOutOfBoundsException.class)
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection9() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -329,7 +333,7 @@ public class CrossOperatorTest {
.projectSecond(5);
}
- @Test(expected=IndexOutOfBoundsException.class)
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection29() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -351,8 +355,8 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectFirst(2);
}
-
- @Test(expected=IndexOutOfBoundsException.class)
+
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection30() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -375,7 +379,7 @@ public class CrossOperatorTest {
.projectSecond(2);
}
- @Test(expected=IndexOutOfBoundsException.class)
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection31() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -386,7 +390,7 @@ public class CrossOperatorTest {
ds1.cross(ds2)
.projectSecond(-1);
}
-
+
public void testCrossProjection12() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -398,8 +402,8 @@ public class CrossOperatorTest {
.projectSecond(2)
.projectFirst(1);
}
-
- @Test(expected=IndexOutOfBoundsException.class)
+
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection32() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -412,7 +416,7 @@ public class CrossOperatorTest {
.projectFirst(-1);
}
- @Test(expected=IndexOutOfBoundsException.class)
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection13() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -425,7 +429,7 @@ public class CrossOperatorTest {
.projectFirst(5);
}
- @Test(expected=IndexOutOfBoundsException.class)
+ @Test(expected = IndexOutOfBoundsException.class)
public void testCrossProjection14() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -437,12 +441,12 @@ public class CrossOperatorTest {
.projectFirst(0)
.projectSecond(5);
}
-
+
/*
* ####################################################################
*/
- public static class CustomType implements Serializable {
+ private static class CustomType implements Serializable {
private static final long serialVersionUID = 1L;
@@ -450,7 +454,8 @@ public class CrossOperatorTest {
public long myLong;
public String myString;
- public CustomType() {}
+ public CustomType() {
+ }
public CustomType(int i, long l, String s) {
myInt = i;
@@ -460,7 +465,7 @@ public class CrossOperatorTest {
@Override
public String toString() {
- return myInt+","+myLong+","+myString;
+ return myInt + "," + myLong + "," + myString;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 0493583..ab8398f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.operator;
import org.apache.flink.api.common.InvalidProgramException;
@@ -25,6 +26,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -33,6 +35,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Tests for {@link DataSet#writeAsText(String)}.
+ */
public class DataSinkTest {
// TUPLE DATA
@@ -48,7 +53,7 @@ public class DataSinkTest {
@Before
public void fillPojoData() {
- if(pojoData.isEmpty()) {
+ if (pojoData.isEmpty()) {
pojoData.add(new CustomType());
}
}
@@ -186,7 +191,7 @@ public class DataSinkTest {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
- .generateSequence(0,2);
+ .generateSequence(0, 2);
// should work
try {
@@ -203,7 +208,7 @@ public class DataSinkTest {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
- .generateSequence(0,2);
+ .generateSequence(0, 2);
// must not work
longDs.writeAsText("/tmp/willNotHappen")
@@ -216,7 +221,7 @@ public class DataSinkTest {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
- .generateSequence(0,2);
+ .generateSequence(0, 2);
// must not work
longDs.writeAsText("/tmp/willNotHappen")
@@ -229,7 +234,7 @@ public class DataSinkTest {
final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
DataSet<Long> longDs = env
- .generateSequence(0,2);
+ .generateSequence(0, 2);
// must not work
longDs.writeAsText("/tmp/willNotHappen")
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index 0bbeeb2..b768389 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -18,27 +18,31 @@
package org.apache.flink.api.java.operator;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+import org.junit.Assert;
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#distinct()}.
+ */
public class DistinctOperatorTest {
// TUPLE DATA
- private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -46,52 +50,52 @@ public class DistinctOperatorTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
// LONG DATA
private final List<Long> emptyLongData = new ArrayList<Long>();
-
+
private final List<CustomType> customTypeData = new ArrayList<CustomType>();
-
- @Test
+
+ @Test
public void testDistinctByKeyFields1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.distinct(0);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
- @Test(expected = InvalidProgramException.class)
+
+ @Test(expected = InvalidProgramException.class)
public void testDistinctByKeyFields2() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
// should not work: distinct on basic type
longDs.distinct(0);
}
-
- @Test(expected = InvalidProgramException.class)
+
+ @Test(expected = InvalidProgramException.class)
public void testDistinctByKeyFields3() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
this.customTypeData.add(new CustomType());
-
+
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
// should not work: distinct on custom type
customDs.distinct(0);
-
+
}
-
+
@Test
public void testDistinctByKeyFields4() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -101,17 +105,17 @@ public class DistinctOperatorTest {
@Test
public void testDistinctByKeyFields5() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
this.customTypeData.add(new CustomType());
-
+
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
// should work
customDs.distinct();
}
-
+
@Test(expected = IndexOutOfBoundsException.class)
public void testDistinctByKeyFields6() {
@@ -134,29 +138,29 @@ public class DistinctOperatorTest {
Assert.fail();
}
}
-
+
@Test
@SuppressWarnings("serial")
public void testDistinctByKeySelector1() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
this.customTypeData.add(new CustomType());
-
+
try {
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
// should work
customDs.distinct(
new KeySelector<DistinctOperatorTest.CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
}
});
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
}
@Test
@@ -166,7 +170,7 @@ public class DistinctOperatorTest {
DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
// should work
longDs.distinct();
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -206,14 +210,17 @@ public class DistinctOperatorTest {
public List<Integer> myInts;
}
+ /**
+ * Custom data type, for testing purposes.
+ */
public static class CustomType implements Serializable {
-
+
private static final long serialVersionUID = 1L;
-
+
public int myInt;
public long myLong;
public String myString;
-
+
public CustomType() {}
public CustomType(int i, long l, String s) {
@@ -221,11 +228,11 @@ public class DistinctOperatorTest {
myLong = l;
myString = s;
}
-
+
@Override
public String toString() {
- return myInt+","+myLong+","+myString;
+ return myInt + "," + myLong + "," + myString;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
index aaf744c..b02b318 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FirstNOperatorTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.api.java.operator;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -28,17 +25,24 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#first(int)}.
+ */
public class FirstNOperatorTest {
// TUPLE DATA
-
- private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -46,127 +50,127 @@ public class FirstNOperatorTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
@Test
public void testUngroupedFirstN() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.first(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should work
try {
tupleDs.first(10);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work n == 0
try {
tupleDs.first(0);
Assert.fail();
- } catch(InvalidProgramException ipe) {
+ } catch (InvalidProgramException ipe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work n == -1
try {
tupleDs.first(-1);
Assert.fail();
- } catch(InvalidProgramException ipe) {
+ } catch (InvalidProgramException ipe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
}
-
+
@Test
public void testGroupedFirstN() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(2).first(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should work
try {
- tupleDs.groupBy(1,3).first(10);
- } catch(Exception e) {
+ tupleDs.groupBy(1, 3).first(10);
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work n == 0
try {
tupleDs.groupBy(0).first(0);
Assert.fail();
- } catch(InvalidProgramException ipe) {
+ } catch (InvalidProgramException ipe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work n == -1
try {
tupleDs.groupBy(2).first(-1);
Assert.fail();
- } catch(InvalidProgramException ipe) {
+ } catch (InvalidProgramException ipe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
public void testGroupedSortedFirstN() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(1);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should work
try {
- tupleDs.groupBy(1,3).sortGroup(4, Order.ASCENDING).first(10);
- } catch(Exception e) {
+ tupleDs.groupBy(1, 3).sortGroup(4, Order.ASCENDING).first(10);
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work n == 0
try {
tupleDs.groupBy(0).sortGroup(4, Order.ASCENDING).first(0);
Assert.fail();
- } catch(InvalidProgramException ipe) {
+ } catch (InvalidProgramException ipe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
-
+
// should not work n == -1
try {
tupleDs.groupBy(2).sortGroup(4, Order.ASCENDING).first(-1);
Assert.fail();
- } catch(InvalidProgramException ipe) {
+ } catch (InvalidProgramException ipe) {
// we're good here
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
index 9f2aa41..d52f59f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -28,11 +28,15 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Tests for {@link DataSet#fullOuterJoin(DataSet)}.
+ */
public class FullOuterJoinOperatorTest {
// TUPLE DATA
@@ -201,7 +205,6 @@ public class FullOuterJoinOperatorTest {
this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
}
-
private void testFullOuterStrategies(JoinHint hint) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -214,13 +217,12 @@ public class FullOuterJoinOperatorTest {
.with(new DummyJoin());
}
-
/*
* ####################################################################
*/
@SuppressWarnings("serial")
- public static class DummyJoin implements
+ private static class DummyJoin implements
JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
@Override
@@ -230,7 +232,7 @@ public class FullOuterJoinOperatorTest {
}
@SuppressWarnings("serial")
- public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+ private static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
@Override
public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
@@ -239,7 +241,7 @@ public class FullOuterJoinOperatorTest {
}
@SuppressWarnings("serial")
- public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+ private static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
@Override
public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
index 4870d29..336219b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupCombineOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
import java.util.ArrayList;
@@ -38,13 +39,16 @@ import java.util.List;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link DataSet#combineGroup(GroupCombineFunction)}.
+ */
@SuppressWarnings("serial")
public class GroupCombineOperatorTest {
private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -52,14 +56,14 @@ public class GroupCombineOperatorTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
@Test
public void testSemanticPropsWithKeySelector1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.combineGroup(new DummyGroupCombineFunction1());
@@ -95,7 +99,7 @@ public class GroupCombineOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.combineGroup(new DummyGroupCombineFunction1());
@@ -134,7 +138,7 @@ public class GroupCombineOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.combineGroup(new DummyGroupCombineFunction2())
.withForwardedFields("0->4;1;1->3;2");
@@ -171,7 +175,7 @@ public class GroupCombineOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.combineGroup(new DummyGroupCombineFunction2())
@@ -211,7 +215,7 @@ public class GroupCombineOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.combineGroup(new DummyGroupCombineFunction3())
.withForwardedFields("4->0;3;3->1;2");
@@ -245,7 +249,7 @@ public class GroupCombineOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.combineGroup(new DummyGroupCombineFunction3())
@@ -282,7 +286,7 @@ public class GroupCombineOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> combineOp =
+ GroupCombineOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> combineOp =
tupleDs.groupBy(new DummyTestKeySelector())
.combineGroup(new DummyGroupCombineFunction4());
@@ -308,7 +312,7 @@ public class GroupCombineOperatorTest {
assertTrue(semProps.getReadFields(0) == null);
}
- public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ private static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
@Override
public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
return new Tuple2<Long, Integer>();
@@ -317,27 +321,27 @@ public class GroupCombineOperatorTest {
@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
@FunctionAnnotation.ReadFields("0;3;4")
- public static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupCombineFunction1 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
}
@FunctionAnnotation.ReadFields("0;3;4")
- public static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupCombineFunction2 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
}
- public static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupCombineFunction3 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
}
@FunctionAnnotation.NonForwardedFields("2;4")
- public static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupCombineFunction4 implements GroupCombineFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void combine(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
index 0bfe566..62380eb 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupReduceOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
import java.util.ArrayList;
@@ -38,13 +39,16 @@ import java.util.List;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link DataSet#reduceGroup(GroupReduceFunction)}.
+ */
@SuppressWarnings("serial")
public class GroupReduceOperatorTest {
private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -52,14 +56,14 @@ public class GroupReduceOperatorTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
-
+
@Test
public void testSemanticPropsWithKeySelector1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction1());
@@ -95,7 +99,7 @@ public class GroupReduceOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.reduceGroup(new DummyGroupReduceFunction1());
@@ -134,7 +138,7 @@ public class GroupReduceOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction2())
.withForwardedFields("0->4;1;1->3;2");
@@ -171,7 +175,7 @@ public class GroupReduceOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.reduceGroup(new DummyGroupReduceFunction2())
@@ -211,7 +215,7 @@ public class GroupReduceOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction3())
.withForwardedFields("4->0;3;3->1;2");
@@ -245,7 +249,7 @@ public class GroupReduceOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
.reduceGroup(new DummyGroupReduceFunction3())
@@ -282,7 +286,7 @@ public class GroupReduceOperatorTest {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>,Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
+ GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
tupleDs.groupBy(new DummyTestKeySelector())
.reduceGroup(new DummyGroupReduceFunction4());
@@ -308,7 +312,7 @@ public class GroupReduceOperatorTest {
assertTrue(semProps.getReadFields(0) == null);
}
- public static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
+ private static class DummyTestKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Tuple2<Long, Integer>> {
@Override
public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, String, Long, Integer> value) throws Exception {
return new Tuple2<Long, Integer>();
@@ -317,27 +321,27 @@ public class GroupReduceOperatorTest {
@FunctionAnnotation.ForwardedFields("0->4;1;1->3;2")
@FunctionAnnotation.ReadFields("0;3;4")
- public static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupReduceFunction1 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
}
@FunctionAnnotation.ReadFields("0;3;4")
- public static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupReduceFunction2 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
}
- public static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupReduceFunction3 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
}
@FunctionAnnotation.NonForwardedFields("2;4")
- public static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
+ private static class DummyGroupReduceFunction4 implements GroupReduceFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> {
@Override
public void reduce(Iterable<Tuple5<Integer, Long, String, Long, Integer>> values, Collector<Tuple5<Integer, Long, String, Long, Integer>> out) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5d8afe/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 18b17b5..0117cf1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -18,10 +18,6 @@
package org.apache.flink.api.java.operator;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -34,16 +30,24 @@ import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link DataSet#groupBy(int...)}.
+ */
public class GroupingTest {
// TUPLE DATA
- private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
-
- private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
@@ -62,64 +66,64 @@ public class GroupingTest {
// LONG DATA
private final List<Long> emptyLongData = new ArrayList<Long>();
-
+
private final List<CustomType> customTypeData = new ArrayList<CustomType>();
private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
-
+
private final List<Tuple2<byte[], byte[]>> byteArrayData = new ArrayList<Tuple2<byte[], byte[]>>();
- @Test
+ @Test
public void testGroupByKeyFields1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(0);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
- @Test(expected = InvalidProgramException.class)
+
+ @Test(expected = InvalidProgramException.class)
public void testGroupByKeyFields2() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
// should not work: groups on basic type
longDs.groupBy(0);
}
-
- @Test(expected = InvalidProgramException.class)
+
+ @Test(expected = InvalidProgramException.class)
public void testGroupByKeyFields3() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
this.customTypeData.add(new CustomType());
-
+
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
// should not work: groups on custom type
customDs.groupBy(0);
-
+
}
-
+
@Test(expected = IndexOutOfBoundsException.class)
public void testGroupByKeyFields4() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should not work, key out of tuple bounds
tupleDs.groupBy(5);
}
-
+
@Test(expected = IndexOutOfBoundsException.class)
public void testGroupByKeyFields5() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
@@ -148,7 +152,7 @@ public class GroupingTest {
// should work
try {
ds.groupBy("myInt");
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -198,7 +202,7 @@ public class GroupingTest {
// should work
try {
ds.groupBy("nested.myInt");
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -212,48 +216,48 @@ public class GroupingTest {
// should not work, key out of tuple bounds
ds.groupBy("nested.myNonExistent");
}
-
+
@Test
@SuppressWarnings("serial")
public void testGroupByKeySelector1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
this.customTypeData.add(new CustomType());
-
+
try {
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
// should work
customDs.groupBy(
new KeySelector<GroupingTest.CustomType, Long>() {
-
+
@Override
public Long getKey(CustomType value) {
return value.myLong;
}
});
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test
@SuppressWarnings("serial")
public void testGroupByKeySelector2() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
this.customTypeData.add(new CustomType());
-
+
try {
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
// should work
customDs.groupBy(
new KeySelector<GroupingTest.CustomType, Tuple2<Integer, Long>>() {
@Override
- public Tuple2<Integer,Long> getKey(CustomType value) {
+ public Tuple2<Integer, Long> getKey(CustomType value) {
return new Tuple2<Integer, Long>(value.myInt, value.myLong);
}
});
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -261,7 +265,7 @@ public class GroupingTest {
@Test
@SuppressWarnings("serial")
public void testGroupByKeySelector3() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
this.customTypeData.add(new CustomType());
@@ -275,7 +279,7 @@ public class GroupingTest {
return value;
}
});
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -297,7 +301,7 @@ public class GroupingTest {
return new Tuple2<Integer, CustomType>(value.myInt, value);
}
});
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -319,38 +323,38 @@ public class GroupingTest {
}
});
}
-
+
@Test
public void testGroupSortKeyFields1() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
-
+
@Test(expected = IndexOutOfBoundsException.class)
public void testGroupSortKeyFields2() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should not work, field index out of bounds
tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING);
-
+
}
-
+
@Test(expected = InvalidProgramException.class)
public void testGroupSortKeyFields3() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
-
+
// should not work: sorted groups on groupings by key selectors
longDs.groupBy(new KeySelector<Long, Long>() {
private static final long serialVersionUID = 1L;
@@ -359,9 +363,9 @@ public class GroupingTest {
public Long getKey(Long value) {
return value;
}
-
+
}).sortGroup(0, Order.ASCENDING);
-
+
}
@Test(expected = InvalidProgramException.class)
@@ -387,17 +391,17 @@ public class GroupingTest {
tupleDs.groupBy(0)
.sortGroup(3, Order.ASCENDING);
}
-
+
@Test
public void testChainedGroupSortKeyFields() {
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
// should work
try {
tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -412,7 +416,7 @@ public class GroupingTest {
// should work
try {
tupleDs.groupBy("f0").sortGroup("f1", Order.ASCENDING);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -427,7 +431,7 @@ public class GroupingTest {
// should work
try {
tupleDs.groupBy("f0").sortGroup("f2.myString", Order.ASCENDING);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -444,7 +448,7 @@ public class GroupingTest {
tupleDs.groupBy("f0")
.sortGroup("f2.myString", Order.ASCENDING)
.sortGroup("f1", Order.DESCENDING);
- } catch(Exception e) {
+ } catch (Exception e) {
Assert.fail();
}
}
@@ -496,7 +500,7 @@ public class GroupingTest {
// should not work
tupleDs.groupBy(
- new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+ new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long>() {
@Override
public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
return value.f1;
@@ -521,7 +525,7 @@ public class GroupingTest {
// should not work
tupleDs.groupBy(
- new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+ new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long>() {
@Override
public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
return value.f1;
@@ -546,7 +550,7 @@ public class GroupingTest {
// should not work
tupleDs.groupBy(
- new KeySelector<Tuple4<Integer,Long,CustomType,Long[]>, Long>() {
+ new KeySelector<Tuple4<Integer, Long, CustomType, Long[]>, Long>() {
@Override
public Long getKey(Tuple4<Integer, Long, CustomType, Long[]> value) throws Exception {
return value.f1;
@@ -593,19 +597,25 @@ public class GroupingTest {
dataSet.groupBy("*");
}
-
+ /**
+ * Custom data type, for testing purposes.
+ */
public static class CustomType implements Serializable {
-
+
+ /**
+ * Custom nested data type, for testing purposes.
+ */
public static class Nest {
public int myInt;
}
+
private static final long serialVersionUID = 1L;
-
+
public int myInt;
public long myLong;
public String myString;
public Nest nested;
-
+
public CustomType() {}
public CustomType(int i, long l, String s) {
@@ -613,13 +623,16 @@ public class GroupingTest {
myLong = l;
myString = s;
}
-
+
@Override
public String toString() {
- return myInt+","+myLong+","+myString;
+ return myInt + "," + myLong + "," + myString;
}
}
+ /**
+ * Custom non-nested data type, for testing purposes.
+ */
public static class CustomType2 implements Serializable {
public int myInt;