You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 16:27:27 UTC
[3/5] flink git commit: [hotfix] [tests] Refactor TypeHintITCase to
extend AbstractTestBase
[hotfix] [tests] Refactor TypeHintITCase to extend AbstractTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/244f03f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/244f03f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/244f03f3
Branch: refs/heads/master
Commit: 244f03f363a6eea709cd45f8e9f495f0ac4eca62
Parents: 98afd1d
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:19:29 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100
----------------------------------------------------------------------
.../flink/test/operators/TypeHintITCase.java | 371 +++++++++----------
1 file changed, 170 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/244f03f3/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
index b634005..62b5186 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
@@ -32,227 +32,196 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.test.operators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
/**
* Integration tests for {@link org.apache.flink.api.common.typeinfo.TypeHint}.
*/
-@RunWith(Parameterized.class)
-public class TypeHintITCase extends JavaProgramTestBase {
+public class TypeHintITCase extends AbstractTestBase {
- private static final int NUM_PROGRAMS = 9;
+ @Test
+ public void testIdentityMapWithMissingTypesAndStringTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
- private final int curProgId;
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
+ .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
+ .returns("Tuple3<Integer, Long, String>");
+ List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
- public TypeHintITCase(int curProgId) {
- this.curProgId = curProgId;
+ String expectedResult = "(2,2,Hello)\n" +
+ "(3,2,Hello world)\n" +
+ "(1,1,Hi)\n";
+
+ compareResultAsText(result, expectedResult);
}
- @Override
- protected void testProgram() throws Exception {
- TypeHintProgs.runProgram(curProgId);
+ @Test
+ public void testIdentityMapWithMissingTypesAndTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
+ // all following generics get erased during compilation
+ .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
+ .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+ List<Tuple3<Integer, Long, String>> result = identityMapDs
+ .collect();
+
+ String expectedResult = "(2,2,Hello)\n" +
+ "(3,2,Hello world)\n" +
+ "(1,1,Hi)\n";
+
+ compareResultAsText(result, expectedResult);
}
- @Parameters
- public static Collection<Object[]> getConfigurations() {
+ @Test
+ public void testFlatMapWithClassTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
- Collection<Object[]> parameters = new ArrayList<>(NUM_PROGRAMS);
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> identityMapDs = ds
+ .flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
+ .returns(Integer.class);
+ List<Integer> result = identityMapDs.collect();
- for (int i = 1; i <= NUM_PROGRAMS; i++) {
- parameters.add(new Object[]{i});
- }
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
- return parameters;
+ compareResultAsText(result, expectedResult);
}
- private static class TypeHintProgs {
-
- public static void runProgram(int progId) throws Exception {
- switch(progId) {
- // Test identity map with missing types and string type hint
- case 1: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
- .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
- .returns("Tuple3<Integer, Long, String>");
- List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
-
- String expectedResult = "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test identity map with missing types and type information type hint
- case 2: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
- // all following generics get erased during compilation
- .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
- .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
- List<Tuple3<Integer, Long, String>> result = identityMapDs
- .collect();
-
- String expectedResult = "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test flat map with class type hint
- case 3: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> identityMapDs = ds
- .flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
- .returns(Integer.class);
- List<Integer> result = identityMapDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test join with type information type hint
- case 4: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .join(ds2)
- .where(0)
- .equalTo(0)
- .with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test flat join with type information type hint
- case 5: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .join(ds2)
- .where(0)
- .equalTo(0)
- .with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test unsorted group reduce with type information type hint
- case 6: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test sorted group reduce with type information type hint
- case 7: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .sortGroup(0, Order.ASCENDING)
- .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test combine group with type information type hint
- case 8: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test cogroup with type information type hint
- case 9: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .coGroup(ds2)
- .where(0)
- .equalTo(0)
- .with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
+ @Test
+ public void testJoinWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds1
+ .join(ds2)
+ .where(0)
+ .equalTo(0)
+ .with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testFlatJoinWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds1
+ .join(ds2)
+ .where(0)
+ .equalTo(0)
+ .with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testUnsortedGroupReduceWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds
+ .groupBy(0)
+ .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds
+ .groupBy(0)
+ .sortGroup(0, Order.ASCENDING)
+ .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testCombineGroupWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds
+ .groupBy(0)
+ .combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testCoGroupWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds1
+ .coGroup(ds2)
+ .where(0)
+ .equalTo(0)
+ .with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
}
// --------------------------------------------------------------------------------------------