You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:12 UTC
[12/22] flink git commit: [FLINK-6731] [tests] Activate strict
checkstyle for flink-tests
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
deleted file mode 100644
index 6efc565..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ReduceITCase extends MultipleProgramsTestBase {
-
- public ReduceITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
- /*
- * Reduce on tuples with key field selector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduce(new Tuple3Reduce("B-)"));
-
- List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
- String expected = "1,1,Hi\n" +
- "5,2,B-)\n" +
- "15,3,B-)\n" +
- "34,4,B-)\n" +
- "65,5,B-)\n" +
- "111,6,B-)\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testReduceOnTupleWithMultipleKeyFieldSelectors() throws Exception{
- /*
- * Reduce on tuples with multiple key field selectors
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
- groupBy(4,0).reduce(new Tuple5Reduce());
-
- List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
- .collect();
-
- String expected = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testReduceOnTuplesWithKeyExtractor() throws Exception {
- /*
- * Reduce on tuples with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(new KeySelector1()).reduce(new Tuple3Reduce("B-)"));
-
- List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
- String expected = "1,1,Hi\n" +
- "5,2,B-)\n" +
- "15,3,B-)\n" +
- "34,4,B-)\n" +
- "65,5,B-)\n" +
- "111,6,B-)\n";
-
- compareResultAsTuples(result, expected);
- }
-
- public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
- private static final long serialVersionUID = 1L;
- @Override
- public Long getKey(Tuple3<Integer, Long, String> in) {
- return in.f1;
- }
- }
-
- @Test
- public void testReduceOnCustomTypeWithKeyExtractor() throws Exception {
- /*
- * Reduce on custom type with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.
- groupBy(new KeySelector2()).reduce(new CustomTypeReduce());
-
- List<CustomType> result = reduceDs.collect();
-
- String expected = "1,0,Hi\n" +
- "2,3,Hello!\n" +
- "3,12,Hello!\n" +
- "4,30,Hello!\n" +
- "5,60,Hello!\n" +
- "6,105,Hello!\n";
-
- compareResultAsText(result, expected);
- }
-
- public static class KeySelector2 implements KeySelector<CustomType, Integer> {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }
-
- @Test
- public void testAllReduceForTuple() throws Exception {
- /*
- * All-reduce for tuple
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- reduce(new AllAddingTuple3Reduce());
-
- List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
- String expected = "231,91,Hello World\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testAllReduceForCustomTypes() throws Exception {
- /*
- * All-reduce for custom types
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.
- reduce(new AllAddingCustomTypeReduce());
-
- List<CustomType> result = reduceDs.collect();
-
- String expected = "91,210,Hello!";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testReduceWithBroadcastSet() throws Exception {
- /*
- * Reduce with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduce(new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
-
- List<Tuple3<Integer, Long, String>> result = reduceDs.collect();
-
- String expected = "1,1,Hi\n" +
- "5,2,55\n" +
- "15,3,55\n" +
- "34,4,55\n" +
- "65,5,55\n" +
- "111,6,55\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testReduceATupleReturningKeySelector() throws Exception {
- /*
- * Reduce with a Tuple-returning KeySelector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds .
- groupBy(new KeySelector3()).reduce(new Tuple5Reduce());
-
- List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
- .collect();
-
- String expected = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- compareResultAsTuples(result, expected);
- }
-
- public static class KeySelector3 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f4);
- }
- }
-
- @Test
- public void testReduceOnTupleWithMultipleKeyExpressions() throws Exception {
- /*
- * Case 2 with String-based field expression
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
- groupBy("f4","f0").reduce(new Tuple5Reduce());
-
- List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
- .collect();
-
- String expected = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testReduceOnTupleWithMultipleKeyExpressionsWithHashHint() throws Exception {
- /*
- * Case 2 with String-based field expression
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
- groupBy("f4","f0").reduce(new Tuple5Reduce()).setCombineHint(CombineHint.HASH);
-
- List<Tuple5<Integer, Long, Integer, String, Long>> result = reduceDs
- .collect();
-
- String expected = "1,1,0,Hallo,1\n" +
- "2,3,2,Hallo Welt wie,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "3,9,0,P-),2\n" +
- "3,6,5,BCD,3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,10,GHI,1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testSupportForDataAndEnumSerialization() throws Exception {
- /**
- * Test support for Date and enum serialization
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new Mapper1());
- ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
-
- DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReducer1());
-
- List<String> result = res.collect();
-
- String expected = "ok\nok";
-
- compareResultAsText(result, expected);
- }
-
- public static class Mapper1 implements MapFunction<Long, PojoWithDateAndEnum> {
- @Override
- public PojoWithDateAndEnum map(Long value) throws Exception {
- int l = value.intValue();
- switch (l) {
- case 0:
- PojoWithDateAndEnum one = new PojoWithDateAndEnum();
- one.group = "a";
- one.date = new Date(666);
- one.cat = CollectionDataSets.Category.CAT_A;
- return one;
- case 1:
- PojoWithDateAndEnum two = new PojoWithDateAndEnum();
- two.group = "a";
- two.date = new Date(666);
- two.cat = CollectionDataSets.Category.CAT_A;
- return two;
- case 2:
- PojoWithDateAndEnum three = new PojoWithDateAndEnum();
- three.group = "b";
- three.date = new Date(666);
- three.cat = CollectionDataSets.Category.CAT_B;
- return three;
- }
- throw new RuntimeException("Unexpected value for l=" + l);
- }
- }
-
- public static class GroupReducer1 implements GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<PojoWithDateAndEnum> values,
- Collector<String> out) throws Exception {
- for(PojoWithDateAndEnum val : values) {
- if(val.cat == CollectionDataSets.Category.CAT_A) {
- Assert.assertEquals("a", val.group);
- } else if(val.cat == CollectionDataSets.Category.CAT_B) {
- Assert.assertEquals("b", val.group);
- } else {
- Assert.fail("error. Cat = "+val.cat);
- }
- Assert.assertEquals(666, val.date.getTime());
- }
- out.collect("ok");
- }
- }
-
- public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
- private final String f2Replace;
-
- public Tuple3Reduce() {
- this.f2Replace = null;
- }
-
- public Tuple3Reduce(String f2Replace) {
- this.f2Replace = f2Replace;
- }
-
-
- @Override
- public Tuple3<Integer, Long, String> reduce(
- Tuple3<Integer, Long, String> in1,
- Tuple3<Integer, Long, String> in2) throws Exception {
-
- if(f2Replace == null) {
- out.setFields(in1.f0+in2.f0, in1.f1, in1.f2);
- } else {
- out.setFields(in1.f0+in2.f0, in1.f1, this.f2Replace);
- }
- return out;
- }
- }
-
- public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
- private static final long serialVersionUID = 1L;
- private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
-
- @Override
- public Tuple5<Integer, Long, Integer, String, Long> reduce(
- Tuple5<Integer, Long, Integer, String, Long> in1,
- Tuple5<Integer, Long, Integer, String, Long> in2)
- throws Exception {
-
- out.setFields(in1.f0, in1.f1+in2.f1, 0, "P-)", in1.f4);
- return out;
- }
- }
-
- public static class CustomTypeReduce implements ReduceFunction<CustomType> {
- private static final long serialVersionUID = 1L;
- private final CustomType out = new CustomType();
-
- @Override
- public CustomType reduce(CustomType in1, CustomType in2)
- throws Exception {
-
- out.myInt = in1.myInt;
- out.myLong = in1.myLong + in2.myLong;
- out.myString = "Hello!";
- return out;
- }
- }
-
- public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-
- @Override
- public Tuple3<Integer, Long, String> reduce(
- Tuple3<Integer, Long, String> in1,
- Tuple3<Integer, Long, String> in2) throws Exception {
-
- out.setFields(in1.f0+in2.f0, in1.f1+in2.f1, "Hello World");
- return out;
- }
- }
-
- public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> {
- private static final long serialVersionUID = 1L;
- private final CustomType out = new CustomType();
-
- @Override
- public CustomType reduce(CustomType in1, CustomType in2)
- throws Exception {
-
- out.myInt = in1.myInt + in2.myInt;
- out.myLong = in1.myLong + in2.myLong;
- out.myString = "Hello!";
- return out;
- }
- }
-
- public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
- private String f2Replace = "";
-
- @Override
- public void open(Configuration config) {
-
- Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
- int sum = 0;
- for(Integer i : ints) {
- sum += i;
- }
- f2Replace = sum+"";
-
- }
-
- @Override
- public Tuple3<Integer, Long, String> reduce(
- Tuple3<Integer, Long, String> in1,
- Tuple3<Integer, Long, String> in2) throws Exception {
-
- out.setFields(in1.f0+in2.f0, in1.f1, this.f2Replace);
- return out;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceWithCombinerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceWithCombinerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceWithCombinerITCase.java
deleted file mode 100644
index 685a9ac..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceWithCombinerITCase.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.CombineFunction;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class ReduceWithCombinerITCase extends MultipleProgramsTestBase {
-
- public ReduceWithCombinerITCase(TestExecutionMode mode) {
- super(TestExecutionMode.CLUSTER);
- }
-
- @Test
- public void testReduceOnNonKeyedDataset() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // creates the input data and distributes them evenly among the available downstream tasks
- DataSet<Tuple2<Integer, Boolean>> input = createNonKeyedInput(env);
- List<Tuple2<Integer, Boolean>> actual = input.reduceGroup(new NonKeyedCombReducer()).collect();
- String expected = "10,true\n";
-
- compareResultAsTuples(actual, expected);
- }
-
- @Test
- public void testForkingReduceOnNonKeyedDataset() throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // creates the input data and distributes them evenly among the available downstream tasks
- DataSet<Tuple2<Integer, Boolean>> input = createNonKeyedInput(env);
-
- DataSet<Tuple2<Integer, Boolean>> r1 = input.reduceGroup(new NonKeyedCombReducer());
- DataSet<Tuple2<Integer, Boolean>> r2 = input.reduceGroup(new NonKeyedGroupCombReducer());
-
- List<Tuple2<Integer, Boolean>> actual = r1.union(r2).collect();
- String expected = "10,true\n10,true\n";
- compareResultAsTuples(actual, expected);
- }
-
- @Test
- public void testReduceOnKeyedDataset() throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // creates the input data and distributes them evenly among the available downstream tasks
- DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env);
- List<Tuple3<String, Integer, Boolean>> actual = input.groupBy(0).reduceGroup(new KeyedCombReducer()).collect();
- String expected = "k1,6,true\nk2,4,true\n";
-
- compareResultAsTuples(actual, expected);
- }
-
- @Test
- public void testReduceOnKeyedDatasetWithSelector() throws Exception {
-
- // set up the execution environment
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // creates the input data and distributes them evenly among the available downstream tasks
- DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env);
-
- List<Tuple3<String, Integer, Boolean>> actual = input
- .groupBy(new KeySelectorX())
- .reduceGroup(new KeyedCombReducer())
- .collect();
- String expected = "k1,6,true\nk2,4,true\n";
-
- compareResultAsTuples(actual, expected);
- }
-
- @Test
- public void testForkingReduceOnKeyedDataset() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // creates the input data and distributes them evenly among the available downstream tasks
- DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env);
-
- UnsortedGrouping<Tuple3<String, Integer, Boolean>> counts = input.groupBy(0);
-
- DataSet<Tuple3<String, Integer, Boolean>> r1 = counts.reduceGroup(new KeyedCombReducer());
- DataSet<Tuple3<String, Integer, Boolean>> r2 = counts.reduceGroup(new KeyedGroupCombReducer());
-
- List<Tuple3<String, Integer, Boolean>> actual = r1.union(r2).collect();
- String expected = "k1,6,true\n" +
- "k2,4,true\n" +
- "k1,6,true\n" +
- "k2,4,true\n";
- compareResultAsTuples(actual, expected);
- }
-
- @Test
- public void testForkingReduceOnKeyedDatasetWithSelection() throws Exception {
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // creates the input data and distributes them evenly among the available downstream tasks
- DataSet<Tuple3<String, Integer, Boolean>> input = createKeyedInput(env);
-
- UnsortedGrouping<Tuple3<String, Integer, Boolean>> counts = input.groupBy(new KeySelectorX());
-
- DataSet<Tuple3<String, Integer, Boolean>> r1 = counts.reduceGroup(new KeyedCombReducer());
- DataSet<Tuple3<String, Integer, Boolean>> r2 = counts.reduceGroup(new KeyedGroupCombReducer());
-
- List<Tuple3<String, Integer, Boolean>> actual = r1.union(r2).collect();
- String expected = "k1,6,true\n" +
- "k2,4,true\n" +
- "k1,6,true\n" +
- "k2,4,true\n";
-
- compareResultAsTuples(actual, expected);
- }
-
- private DataSet<Tuple2<Integer, Boolean>> createNonKeyedInput(ExecutionEnvironment env) {
- return env.fromCollection(Arrays.asList(
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false),
- new Tuple2<>(1, false))
- ).rebalance();
- }
-
- private static class NonKeyedCombReducer implements CombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>,
- GroupReduceFunction<Tuple2<Integer, Boolean>,Tuple2<Integer, Boolean>> {
-
- @Override
- public Tuple2<Integer, Boolean> combine(Iterable<Tuple2<Integer, Boolean>> values) throws Exception {
- int sum = 0;
- boolean flag = true;
-
- for(Tuple2<Integer, Boolean> tuple : values) {
- sum += tuple.f0;
- flag &= !tuple.f1;
-
- }
- return new Tuple2<>(sum, flag);
- }
-
- @Override
- public void reduce(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception {
- int sum = 0;
- boolean flag = true;
- for(Tuple2<Integer, Boolean> tuple : values) {
- sum += tuple.f0;
- flag &= tuple.f1;
- }
- out.collect(new Tuple2<>(sum, flag));
- }
- }
-
- private static class NonKeyedGroupCombReducer implements GroupCombineFunction<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>,
- GroupReduceFunction<Tuple2<Integer, Boolean>,Tuple2<Integer, Boolean>> {
-
- @Override
- public void reduce(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception {
- int sum = 0;
- boolean flag = true;
- for(Tuple2<Integer, Boolean> tuple : values) {
- sum += tuple.f0;
- flag &= tuple.f1;
- }
- out.collect(new Tuple2<>(sum, flag));
- }
-
- @Override
- public void combine(Iterable<Tuple2<Integer, Boolean>> values, Collector<Tuple2<Integer, Boolean>> out) throws Exception {
- int sum = 0;
- boolean flag = true;
- for(Tuple2<Integer, Boolean> tuple : values) {
- sum += tuple.f0;
- flag &= !tuple.f1;
- }
- out.collect(new Tuple2<>(sum, flag));
- }
- }
-
- private DataSet<Tuple3<String, Integer, Boolean>> createKeyedInput(ExecutionEnvironment env) {
- return env.fromCollection(Arrays.asList(
- new Tuple3<>("k1", 1, false),
- new Tuple3<>("k1", 1, false),
- new Tuple3<>("k1", 1, false),
- new Tuple3<>("k2", 1, false),
- new Tuple3<>("k1", 1, false),
- new Tuple3<>("k1", 1, false),
- new Tuple3<>("k2", 1, false),
- new Tuple3<>("k2", 1, false),
- new Tuple3<>("k1", 1, false),
- new Tuple3<>("k2", 1, false))
- ).rebalance();
- }
-
- public static class KeySelectorX implements KeySelector<Tuple3<String, Integer, Boolean>, String> {
- private static final long serialVersionUID = 1L;
- @Override
- public String getKey(Tuple3<String, Integer, Boolean> in) {
- return in.f0;
- }
- }
-
- private class KeyedCombReducer implements CombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>>,
- GroupReduceFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>> {
-
- @Override
- public Tuple3<String, Integer, Boolean> combine(Iterable<Tuple3<String, Integer, Boolean>> values) throws Exception {
- String key = null;
- int sum = 0;
- boolean flag = true;
-
- for(Tuple3<String, Integer, Boolean> tuple : values) {
- key = (key == null) ? tuple.f0 : key;
- sum += tuple.f1;
- flag &= !tuple.f2;
- }
- return new Tuple3<>(key, sum, flag);
- }
-
- @Override
- public void reduce(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
- String key = null;
- int sum = 0;
- boolean flag = true;
-
- for(Tuple3<String, Integer, Boolean> tuple : values) {
- key = (key == null) ? tuple.f0 : key;
- sum += tuple.f1;
- flag &= tuple.f2;
- }
- out.collect(new Tuple3<>(key, sum, flag));
- }
- }
-
- private class KeyedGroupCombReducer implements GroupCombineFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>>,
- GroupReduceFunction<Tuple3<String, Integer, Boolean>, Tuple3<String, Integer, Boolean>> {
-
- @Override
- public void combine(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
- String key = null;
- int sum = 0;
- boolean flag = true;
-
- for(Tuple3<String, Integer, Boolean> tuple : values) {
- key = (key == null) ? tuple.f0 : key;
- sum += tuple.f1;
- flag &= !tuple.f2;
- }
- out.collect(new Tuple3<>(key, sum, flag));
- }
-
- @Override
- public void reduce(Iterable<Tuple3<String, Integer, Boolean>> values, Collector<Tuple3<String, Integer, Boolean>> out) throws Exception {
- String key = null;
- int sum = 0;
- boolean flag = true;
-
- for(Tuple3<String, Integer, Boolean> tuple : values) {
- key = (key == null) ? tuple.f0 : key;
- sum += tuple.f1;
- flag &= tuple.f2;
- }
- out.collect(new Tuple3<>(key, sum, flag));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
deleted file mode 100644
index 85961db..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-@SuppressWarnings("serial")
-public class RemoteEnvironmentITCase extends TestLogger {
-
- private static final int TM_SLOTS = 4;
-
- private static final int USER_DOP = 2;
-
- private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
-
- private static final String VALID_STARTUP_TIMEOUT = "100 s";
-
- private static Configuration configuration;
-
- private static StandaloneMiniCluster cluster;
-
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- configuration = new Configuration();
-
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
-
- cluster = new StandaloneMiniCluster(configuration);
- }
-
- @AfterClass
- public static void tearDownCluster() throws Exception {
- cluster.close();
- }
-
- /**
- * Ensure that that Akka configuration parameters can be set.
- */
- @Test(expected=FlinkException.class)
- public void testInvalidAkkaConfiguration() throws Throwable {
- Configuration config = new Configuration();
- config.setString(AkkaOptions.STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- cluster.getHostname(),
- cluster.getPort(),
- config
- );
- env.getConfig().disableSysoutLogging();
-
- DataSet<String> result = env.createInput(new TestNonRichInputFormat());
- result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
- try {
- env.execute();
- Assert.fail("Program should not run successfully, cause of invalid akka settings.");
- } catch (ProgramInvocationException ex) {
- throw ex.getCause();
- }
- }
-
- /**
- * Ensure that the program parallelism can be set even if the configuration is supplied.
- */
- @Test
- public void testUserSpecificParallelism() throws Exception {
- Configuration config = new Configuration();
- config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
-
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
- cluster.getHostname(),
- cluster.getPort(),
- config
- );
- env.setParallelism(USER_DOP);
- env.getConfig().disableSysoutLogging();
-
- DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
- .rebalance()
- .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
- @Override
- public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
- out.collect(getRuntimeContext().getIndexOfThisSubtask());
- }
- });
- List<Integer> resultCollection = result.collect();
- assertEquals(USER_DOP, resultCollection.size());
- }
-
- private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
-
- private transient boolean emitted;
-
- @Override
- public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
- assertEquals(USER_DOP, numSplits);
- return super.createInputSplits(numSplits);
- }
-
- @Override
- public boolean reachedEnd() {
- return emitted;
- }
-
- @Override
- public Integer nextRecord(Integer reuse) {
- if (emitted) {
- return null;
- }
- emitted = true;
- return 1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
deleted file mode 100644
index 8cc54aa..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.ReplicatingInputFormat;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.NumberSequenceIterator;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * Tests for replicating DataSources
- */
-
-@RunWith(Parameterized.class)
-public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
-
- public ReplicatingDataSourceITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testReplicatedSourceToJoin() throws Exception {
- /*
- * Test replicated source going into join
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit>
- (new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO)
- .map(new ToTuple());
- DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 1000l).map(new ToTuple());
-
- DataSet<Tuple> pairs = source1.join(source2).where(0).equalTo(0)
- .projectFirst(0)
- .sum(0);
-
- List<Tuple> result = pairs.collect();
-
- String expectedResult = "(500500)";
-
- compareResultAsText(result, expectedResult);
- }
-
- @Test
- public void testReplicatedSourceToCross() throws Exception {
- /*
- * Test replicated source going into cross
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit>
- (new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO)
- .map(new ToTuple());
- DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 1000l).map(new ToTuple());
-
- DataSet<Tuple1<Long>> pairs = source1.cross(source2)
- .filter(new FilterFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>>() {
- @Override
- public boolean filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
- return value.f0.f0.equals(value.f1.f0);
- }
- })
- .map(new MapFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>, Tuple1<Long>>() {
- @Override
- public Tuple1<Long> map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
- return value.f0;
- }
- })
- .sum(0);
-
- List<Tuple1<Long>> result = pairs.collect();
-
- String expectedResult = "(500500)";
-
- compareResultAsText(result, expectedResult);
- }
-
-
- public static class ToTuple implements MapFunction<Long, Tuple1<Long>> {
-
- @Override
- public Tuple1<Long> map(Long value) throws Exception {
- return new Tuple1<Long>(value);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java
deleted file mode 100644
index a9c75e5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SampleITCase.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class SampleITCase extends MultipleProgramsTestBase {
-
- private static final Random RNG = new Random();
-
- public SampleITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Before
- public void initiate() {
- ExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
- }
-
- @Test
- public void testSamplerWithFractionWithoutReplacement() throws Exception {
- verifySamplerWithFractionWithoutReplacement(0d);
- verifySamplerWithFractionWithoutReplacement(0.2d);
- verifySamplerWithFractionWithoutReplacement(1.0d);
- }
-
- @Test
- public void testSamplerWithFractionWithReplacement() throws Exception {
- verifySamplerWithFractionWithReplacement(0d);
- verifySamplerWithFractionWithReplacement(0.2d);
- verifySamplerWithFractionWithReplacement(1.0d);
- verifySamplerWithFractionWithReplacement(2.0d);
- }
-
- @Test
- public void testSamplerWithSizeWithoutReplacement() throws Exception {
- verifySamplerWithFixedSizeWithoutReplacement(0);
- verifySamplerWithFixedSizeWithoutReplacement(2);
- verifySamplerWithFixedSizeWithoutReplacement(21);
- }
-
- @Test
- public void testSamplerWithSizeWithReplacement() throws Exception {
- verifySamplerWithFixedSizeWithReplacement(0);
- verifySamplerWithFixedSizeWithReplacement(2);
- verifySamplerWithFixedSizeWithReplacement(21);
- }
-
- private void verifySamplerWithFractionWithoutReplacement(double fraction) throws Exception {
- verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong());
- }
-
- private void verifySamplerWithFractionWithoutReplacement(double fraction, long seed) throws Exception {
- verifySamplerWithFraction(false, fraction, seed);
- }
-
- private void verifySamplerWithFractionWithReplacement(double fraction) throws Exception {
- verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong());
- }
-
- private void verifySamplerWithFractionWithReplacement(double fraction, long seed) throws Exception {
- verifySamplerWithFraction(true, fraction, seed);
- }
-
- private void verifySamplerWithFraction(boolean withReplacement, double fraction, long seed) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env);
- MapPartitionOperator<String, String> sampled = DataSetUtils.sample(ds, withReplacement, fraction, seed);
- List<String> result = sampled.collect();
- containsResultAsText(result, getSourceStrings());
- }
-
- private void verifySamplerWithFixedSizeWithoutReplacement(int numSamples) throws Exception {
- verifySamplerWithFixedSizeWithoutReplacement(numSamples, RNG.nextLong());
- }
-
- private void verifySamplerWithFixedSizeWithoutReplacement(int numSamples, long seed) throws Exception {
- verifySamplerWithFixedSize(false, numSamples, seed);
- }
-
- private void verifySamplerWithFixedSizeWithReplacement(int numSamples) throws Exception {
- verifySamplerWithFixedSizeWithReplacement(numSamples, RNG.nextLong());
- }
-
- private void verifySamplerWithFixedSizeWithReplacement(int numSamples, long seed) throws Exception {
- verifySamplerWithFixedSize(true, numSamples, seed);
- }
-
- private void verifySamplerWithFixedSize(boolean withReplacement, int numSamples, long seed) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- FlatMapOperator<Tuple3<Integer, Long, String>, String> ds = getSourceDataSet(env);
- DataSet<String> sampled = DataSetUtils.sampleWithSize(ds, withReplacement, numSamples, seed);
- List<String> result = sampled.collect();
- assertEquals(numSamples, result.size());
- containsResultAsText(result, getSourceStrings());
- }
-
- private FlatMapOperator<Tuple3<Integer, Long, String>, String> getSourceDataSet(ExecutionEnvironment env) {
- return CollectionDataSets.get3TupleDataSet(env).flatMap(
- new FlatMapFunction<Tuple3<Integer, Long, String>, String>() {
- @Override
- public void flatMap(Tuple3<Integer, Long, String> value, Collector<String> out) throws Exception {
- out.collect(value.f2);
- }
- });
- }
-
- private String getSourceStrings() {
- return "Hi\n" +
- "Hello\n" +
- "Hello world\n" +
- "Hello world, how are you?\n" +
- "I am fine.\n" +
- "Luke Skywalker\n" +
- "Comment#1\n" +
- "Comment#2\n" +
- "Comment#3\n" +
- "Comment#4\n" +
- "Comment#5\n" +
- "Comment#6\n" +
- "Comment#7\n" +
- "Comment#8\n" +
- "Comment#9\n" +
- "Comment#10\n" +
- "Comment#11\n" +
- "Comment#12\n" +
- "Comment#13\n" +
- "Comment#14\n" +
- "Comment#15\n";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
deleted file mode 100644
index c7f07f6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SortPartitionITCase.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class SortPartitionITCase extends MultipleProgramsTestBase {
-
- public SortPartitionITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testSortPartitionByKeyField() throws Exception {
- /*
- * Test sort partition on key field
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
- .sortPartition(1, Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionByTwoKeyFields() throws Exception {
- /*
- * Test sort partition on two key fields
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input
- .sortPartition(4, Order.ASCENDING)
- .sortPartition(2, Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new Tuple5Checker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testSortPartitionByFieldExpression() throws Exception {
- /*
- * Test sort partition on field expression
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper()).setParallelism(4) // parallelize input
- .sortPartition("f1", Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionByTwoFieldExpressions() throws Exception {
- /*
- * Test sort partition on two field expressions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<Tuple5<Integer, Long, Integer, String, Long>>()).setParallelism(2) // parallelize input
- .sortPartition("f4", Order.ASCENDING)
- .sortPartition("f2", Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new Tuple5Checker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionByNestedFieldExpression() throws Exception {
- /*
- * Test sort partition on nested field expressions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<Tuple2<Tuple2<Integer, Integer>, String>>()).setParallelism(3) // parallelize input
- .sortPartition("f0.f1", Order.ASCENDING)
- .sortPartition("f1", Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new NestedTupleChecker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionPojoByNestedFieldExpression() throws Exception {
- /*
- * Test sort partition on field expression
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- DataSet<POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<POJO>()).setParallelism(1) // parallelize input
- .sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING)
- .sortPartition("number", Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new PojoChecker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionParallelismChange() throws Exception {
- /*
- * Test sort partition with parallelism change
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .sortPartition(1, Order.DESCENDING).setParallelism(3) // change parallelism
- .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionWithKeySelector1() throws Exception {
- /*
- * Test sort partition on an extracted key
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
- .sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Long>() {
- @Override
- public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
- return value.f1;
- }
- }, Order.ASCENDING)
- .mapPartition(new OrderCheckMapper<>(new Tuple3AscendingChecker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- @Test
- public void testSortPartitionWithKeySelector2() throws Exception {
- /*
- * Test sort partition on an extracted key
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- List<Tuple1<Boolean>> result = ds
- .map(new IdMapper<Tuple3<Integer, Long, String>>()).setParallelism(4) // parallelize input
- .sortPartition(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
- @Override
- public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> value) throws Exception {
- return new Tuple2<>(value.f0, value.f1);
- }
- }, Order.DESCENDING)
- .mapPartition(new OrderCheckMapper<>(new Tuple3Checker()))
- .distinct().collect();
-
- String expected = "(true)\n";
-
- compareResultAsText(result, expected);
- }
-
- public interface OrderChecker<T> extends Serializable {
- boolean inOrder(T t1, T t2);
- }
-
- @SuppressWarnings("serial")
- public static class Tuple3Checker implements OrderChecker<Tuple3<Integer, Long, String>> {
- @Override
- public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) {
- return t1.f1 >= t2.f1;
- }
- }
-
- @SuppressWarnings("serial")
- public static class Tuple3AscendingChecker implements OrderChecker<Tuple3<Integer, Long, String>> {
- @Override
- public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) {
- return t1.f1 <= t2.f1;
- }
- }
-
- @SuppressWarnings("serial")
- public static class Tuple5Checker implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
- @Override
- public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1,
- Tuple5<Integer, Long, Integer, String, Long> t2) {
- return t1.f4 < t2.f4 || t1.f4.equals(t2.f4) && t1.f2 >= t2.f2;
- }
- }
-
- @SuppressWarnings("serial")
- public static class NestedTupleChecker implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
- @Override
- public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1,
- Tuple2<Tuple2<Integer, Integer>, String> t2) {
- return t1.f0.f1 < t2.f0.f1 ||
- t1.f0.f1.equals(t2.f0.f1) && t1.f1.compareTo(t2.f1) >= 0;
- }
- }
-
- @SuppressWarnings("serial")
- public static class PojoChecker implements OrderChecker<POJO> {
- @Override
- public boolean inOrder(POJO t1, POJO t2) {
- return t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) < 0 ||
- t1.nestedTupleWithCustom.f1.myString.compareTo(t2.nestedTupleWithCustom.f1.myString) == 0 &&
- t1.number >= t2.number;
- }
- }
-
- @SuppressWarnings("unused, serial")
- public static class OrderCheckMapper<T> implements MapPartitionFunction<T, Tuple1<Boolean>> {
-
- OrderChecker<T> checker;
-
- public OrderCheckMapper() {}
-
- public OrderCheckMapper(OrderChecker<T> checker) {
- this.checker = checker;
- }
-
- @Override
- public void mapPartition(Iterable<T> values, Collector<Tuple1<Boolean>> out) throws Exception {
-
- Iterator<T> it = values.iterator();
- if(!it.hasNext()) {
- out.collect(new Tuple1<>(true));
- } else {
- T last = it.next();
-
- while (it.hasNext()) {
- T next = it.next();
- if (!checker.inOrder(last, next)) {
- out.collect(new Tuple1<>(false));
- return;
- }
- last = next;
- }
- out.collect(new Tuple1<>(true));
- }
- }
- }
-
- @SuppressWarnings("serial")
- public static class IdMapper<T> implements MapFunction<T, T> {
-
- @Override
- public T map(T value) throws Exception {
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
deleted file mode 100644
index e5bdc19..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class SumMinMaxITCase extends MultipleProgramsTestBase {
-
- public SumMinMaxITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testSumMaxAndProject() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> sumDs = ds
- .sum(0)
- .andMax(1)
- .project(0, 1);
-
- List<Tuple2<Integer, Long>> result = sumDs.collect();
-
- String expected = "231,6\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testGroupedAggregate() throws Exception {
- /*
- * Grouped Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
- .sum(0)
- .project(1, 0);
-
- List<Tuple2<Long, Integer>> result = aggregateDs.collect();
-
- String expected = "1,1\n" +
- "2,5\n" +
- "3,15\n" +
- "4,34\n" +
- "5,65\n" +
- "6,111\n";
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testNestedAggregate() throws Exception {
- /*
- * Nested Aggregate
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
- .min(0)
- .min(0)
- .project(0);
-
- List<Tuple1<Integer>> result = aggregateDs.collect();
-
- String expected = "1\n";
-
- compareResultAsTuples(result, expected);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
deleted file mode 100644
index 067939f..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/TypeHintITCase.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class TypeHintITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 9;
-
- private int curProgId = config.getInteger("ProgramId", -1);
-
- public TypeHintITCase(Configuration config) {
- super(config);
- }
-
- @Override
- protected void testProgram() throws Exception {
- TypeHintProgs.runProgram(curProgId);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
- }
-
- private static class TypeHintProgs {
-
- public static void runProgram(int progId) throws Exception {
- switch(progId) {
- // Test identity map with missing types and string type hint
- case 1: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
- .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
- .returns("Tuple3<Integer, Long, String>");
- List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
-
- String expectedResult = "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test identity map with missing types and type information type hint
- case 2: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
- // all following generics get erased during compilation
- .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
- .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
- List<Tuple3<Integer, Long, String>> result = identityMapDs
- .collect();
-
- String expectedResult = "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test flat map with class type hint
- case 3: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> identityMapDs = ds
- .flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
- .returns(Integer.class);
- List<Integer> result = identityMapDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test join with type information type hint
- case 4: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .join(ds2)
- .where(0)
- .equalTo(0)
- .with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test flat join with type information type hint
- case 5: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .join(ds2)
- .where(0)
- .equalTo(0)
- .with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test unsorted group reduce with type information type hint
- case 6: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test sorted group reduce with type information type hint
- case 7: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .sortGroup(0, Order.ASCENDING)
- .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test combine group with type information type hint
- case 8: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test cogroup with type information type hint
- case 9: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .coGroup(ds2)
- .where(0)
- .equalTo(0)
- .with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static class Mapper<T, V> implements MapFunction<T, V> {
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unchecked")
- @Override
- public V map(T value) throws Exception {
- return (V) value;
- }
- }
-
- public static class FlatMapper<T, V> implements FlatMapFunction<T, V> {
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void flatMap(T value, Collector<V> out) throws Exception {
- out.collect((V) ((Tuple3)value).f0);
- }
- }
-
- public static class Joiner<IN1, IN2, OUT> implements JoinFunction<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public OUT join(IN1 first, IN2 second) throws Exception {
- return (OUT) ((Tuple3) first).f0;
- }
- }
-
- public static class FlatJoiner<IN1, IN2, OUT> implements FlatJoinFunction<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception {
- out.collect((OUT) ((Tuple3) first).f0);
- }
- }
-
- public static class GroupReducer<IN, OUT> implements GroupReduceFunction<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception {
- out.collect((OUT) ((Tuple3) values.iterator().next()).f0);
- }
- }
-
- public static class GroupCombiner<IN, OUT> implements GroupCombineFunction<IN, OUT> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void combine(Iterable<IN> values, Collector<OUT> out) throws Exception {
- out.collect((OUT) ((Tuple3) values.iterator().next()).f0);
- }
- }
-
- public static class CoGrouper<IN1, IN2, OUT> implements CoGroupFunction<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception {
- out.collect((OUT) ((Tuple3) first.iterator().next()).f0);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
deleted file mode 100644
index 7ab2764..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class UnionITCase extends MultipleProgramsTestBase {
-
- private static final String FULL_TUPLE_3_STRING = "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" +
- "5,3,I am fine.\n" +
- "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" +
- "8,4,Comment#2\n" +
- "9,4,Comment#3\n" +
- "10,4,Comment#4\n" +
- "11,5,Comment#5\n" +
- "12,5,Comment#6\n" +
- "13,5,Comment#7\n" +
- "14,5,Comment#8\n" +
- "15,5,Comment#9\n" +
- "16,6,Comment#10\n" +
- "17,6,Comment#11\n" +
- "18,6,Comment#12\n" +
- "19,6,Comment#13\n" +
- "20,6,Comment#14\n" +
- "21,6,Comment#15\n";
-
- public UnionITCase(TestExecutionMode mode){
- super(mode);
- }
-
- @Test
- public void testUnion2IdenticalDataSets() throws Exception {
- /*
- * Union of 2 Same Data Sets
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env));
-
- List<Tuple3<Integer, Long, String>> result = unionDs.collect();
-
- String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testUnion5IdenticalDataSets() throws Exception {
- /*
- * Union of 5 same Data Sets, with multiple unions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env))
- .union(CollectionDataSets.get3TupleDataSet(env));
-
- List<Tuple3<Integer, Long, String>> result = unionDs.collect();
-
- String expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
- + FULL_TUPLE_3_STRING +
- FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING;
-
- compareResultAsTuples(result, expected);
- }
-
- @Test
- public void testUnionWithEmptyDataSet() throws Exception {
- /*
- * Test on union with empty dataset
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // Don't know how to make an empty result in an other way than filtering it
- DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
- filter(new RichFilter1());
-
- DataSet<Tuple3<Integer, Long, String>> unionDs = CollectionDataSets.get3TupleDataSet(env)
- .union(empty);
-
- List<Tuple3<Integer, Long, String>> result = unionDs.collect();
-
- String expected = FULL_TUPLE_3_STRING;
-
- compareResultAsTuples(result, expected);
- }
-
- public static class RichFilter1 extends RichFilterFunction<Tuple3<Integer,Long,String>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Tuple3<Integer, Long, String> value) throws Exception {
- return false;
- }
- }
-
-}