You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:08 UTC
[72/82] [abbrv] incubator-flink git commit: Change integration tests
to reuse cluster in order to save startup and shutdown time.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 5f8de8a..f10a9df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -18,10 +18,7 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.Collection;
-import java.util.LinkedList;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
@@ -36,659 +33,636 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
-public class JoinITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 23;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class JoinITCase extends MultipleProgramsTestBase {
+
+ public JoinITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public JoinITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = JoinProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testUDFJoinOnTuplesWithKeyFieldPositions() throws Exception {
+ /*
+ * UDF Join on tuples with key field positions
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(1)
+ .equalTo(1)
+ .with(new T3T5FlatJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testeUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception {
+ /*
+ * UDF Join on tuples with multiple key field positions
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(0,1)
+ .equalTo(0,4)
+ .with(new T3T5FlatJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" +
+ "I am fine.,HIJ\n" +
+ "I am fine.,IJK\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testDefaultJoinOnTuples() throws Exception {
+ /*
+ * Default Join on tuples
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Tuple3<Integer, Long, String>,Tuple5<Integer, Long, Integer, String, Long>>> joinDs =
+ ds1.join(ds2)
+ .where(0)
+ .equalTo(2);
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
+ "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
+ "(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n";
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
}
-
- private static class JoinProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
-
- /*
- * UDF Join on tuples with key field positions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.join(ds2)
+
+ @Test
+ public void testJoinWithHuge() throws Exception {
+ /*
+ * Join with Huge
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2)
+ .where(1)
+ .equalTo(1)
+ .with(new T3T5FlatJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt\n";
+ }
+
+ @Test
+ public void testJoinWithTiny() throws Exception {
+ /*
+ * Join with Tiny
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.joinWithTiny(ds2)
.where(1)
.equalTo(1)
.with(new T3T5FlatJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hallo\n" +
- "Hello,Hallo Welt\n" +
- "Hello world,Hallo Welt\n";
-
- }
- case 2: {
-
- /*
- * UDF Join on tuples with multiple key field positions
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.join(ds2)
- .where(0,1)
- .equalTo(0,4)
- .with(new T3T5FlatJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hallo\n" +
- "Hello,Hallo Welt\n" +
- "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" +
- "I am fine.,HIJ\n" +
- "I am fine.,IJK\n";
-
- }
- case 3: {
-
- /*
- * Default Join on tuples
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<Tuple3<Integer, Long, String>,Tuple5<Integer, Long, Integer, String, Long>>> joinDs =
- ds1.join(ds2)
- .where(0)
- .equalTo(2);
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "(1,1,Hi),(2,2,1,Hallo Welt,2)\n" +
- "(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
- "(3,2,Hello world),(3,4,3,Hallo Welt wie gehts?,2)\n";
-
- }
- case 4: {
-
- /*
- * Join with Huge
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2)
- .where(1)
- .equalTo(1)
- .with(new T3T5FlatJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hallo\n" +
- "Hello,Hallo Welt\n" +
- "Hello world,Hallo Welt\n";
-
- }
- case 5: {
-
- /*
- * Join with Tiny
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.joinWithTiny(ds2)
- .where(1)
- .equalTo(1)
- .with(new T3T5FlatJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hallo\n" +
- "Hello,Hallo Welt\n" +
- "Hello world,Hallo Welt\n";
-
- }
-
- case 6: {
-
- /*
- * Join that returns the left input object
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> joinDs =
- ds1.join(ds2)
- .where(1)
- .equalTo(1)
- .with(new LeftReturningJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n";
- }
- case 7: {
-
- /*
- * Join that returns the right input object
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> joinDs =
- ds1.join(ds2)
- .where(1)
- .equalTo(1)
- .with(new RightReturningJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,Hallo,1\n" +
- "2,2,1,Hallo Welt,2\n" +
- "2,2,1,Hallo Welt,2\n";
- }
- case 8: {
-
- /*
- * Join with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
- DataSet<Tuple3<String, String, Integer>> joinDs =
- ds1.join(ds2)
- .where(1)
- .equalTo(4)
- .with(new T3T5BCJoin())
- .withBroadcastSet(intDs, "ints");
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hallo,55\n" +
- "Hi,Hallo Welt wie,55\n" +
- "Hello,Hallo Welt,55\n" +
- "Hello world,Hallo Welt,55\n";
- }
- case 9: {
-
- /*
- * Join on a tuple input with key field selector and a custom type input with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.join(ds2)
- .where(new KeySelector<CustomType, Integer>() {
- @Override
- public Integer getKey(CustomType value) {
- return value.myInt;
- }
- }
- )
- .equalTo(0)
- .with(new CustT3Join());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hi\n" +
- "Hello,Hello\n" +
- "Hello world,Hello\n";
-
- }
- case 10: {
-
- /*
- * Project join on a tuple input 1
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple6<String, Long, String, Integer, Long, Long>> joinDs =
- ds1.join(ds2)
- .where(1)
- .equalTo(1)
- .projectFirst(2,1)
- .projectSecond(3)
- .projectFirst(0)
- .projectSecond(4,1);
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,1,Hallo,1,1,1\n" +
- "Hello,2,Hallo Welt,2,2,2\n" +
- "Hello world,2,Hallo Welt,3,2,2\n";
-
- }
- case 11: {
-
- /*
- * Project join on a tuple input 2
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple6<String, String, Long, Long, Long, Integer>> joinDs =
- ds1.join(ds2)
- .where(1)
- .equalTo(1)
- .projectSecond(3)
- .projectFirst(2,1)
- .projectSecond(4,1)
- .projectFirst(0);
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hallo,Hi,1,1,1,1\n" +
- "Hallo Welt,Hello,2,2,2,2\n" +
- "Hallo Welt,Hello world,2,2,2,3\n";
- }
-
- case 12: {
-
- /*
- * Join on a tuple input with key field selector and a custom type input with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.join(ds2)
- .where(1).equalTo(new KeySelector<CustomType, Long>() {
- @Override
- public Long getKey(CustomType value) {
- return value.myLong;
- }
- })
- .with(new T3CustJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hello\n" +
- "Hello,Hello world\n" +
- "Hello world,Hello world\n";
-
- }
-
- case 13: {
-
- /*
- * (Default) Join on two custom type inputs with key extractors
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-
- DataSet<Tuple2<CustomType, CustomType>> joinDs =
- ds1.join(ds2)
- .where(
- new KeySelector<CustomType, Integer>() {
- @Override
- public Integer getKey(CustomType value) {
- return value.myInt;
- }
- }
- )
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt\n";
+ }
+
+ @Test
+ public void testJoinThatReturnsTheLeftInputObject() throws Exception {
+ /*
+ * Join that returns the left input object
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> joinDs =
+ ds1.join(ds2)
+ .where(1)
+ .equalTo(1)
+ .with(new LeftReturningJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n";
+ }
+
+ @Test
+ public void testJoinThatReturnsTheRightInputObject() throws Exception {
+ /*
+ * Join that returns the right input object
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> joinDs =
+ ds1.join(ds2)
+ .where(1)
+ .equalTo(1)
+ .with(new RightReturningJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,0,Hallo,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "2,2,1,Hallo Welt,2\n";
+ }
+
+ @Test
+ public void testJoinWithBroadcastSet() throws Exception {
+ /*
+ * Join with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple3<String, String, Integer>> joinDs =
+ ds1.join(ds2)
+ .where(1)
+ .equalTo(4)
+ .with(new T3T5BCJoin())
+ .withBroadcastSet(intDs, "ints");
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hallo,55\n" +
+ "Hi,Hallo Welt wie,55\n" +
+ "Hello,Hallo Welt,55\n" +
+ "Hello world,Hallo Welt,55\n";
+ }
+
+ @Test
+ public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector()
+ throws Exception{
+ /*
+ * Join on a tuple input with key field selector and a custom type input with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(new KeySelector1())
+ .equalTo(0)
+ .with(new CustT3Join());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hi\n" +
+ "Hello,Hello\n" +
+ "Hello world,Hello\n";
+
+ }
+
+ public static class KeySelector1 implements KeySelector<CustomType, Integer> {
+ @Override
+ public Integer getKey(CustomType value) {
+ return value.myInt;
+ }
+ }
+
+ @Test
+ public void testProjectOnATuple1Input() throws Exception {
+ /*
+ * Project join on a tuple input 1
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple6<String, Long, String, Integer, Long, Long>> joinDs =
+ ds1.join(ds2)
+ .where(1)
+ .equalTo(1)
+ .projectFirst(2,1)
+ .projectSecond(3)
+ .projectFirst(0)
+ .projectSecond(4,1);
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,1,Hallo,1,1,1\n" +
+ "Hello,2,Hallo Welt,2,2,2\n" +
+ "Hello world,2,Hallo Welt,3,2,2\n";
+ }
+
+ @Test
+ public void testProjectJoinOnATuple2Input() throws Exception {
+ /*
+ * Project join on a tuple input 2
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple6<String, String, Long, Long, Long, Integer>> joinDs =
+ ds1.join(ds2)
+ .where(1)
+ .equalTo(1)
+ .projectSecond(3)
+ .projectFirst(2,1)
+ .projectSecond(4,1)
+ .projectFirst(0);
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hallo,Hi,1,1,1,1\n" +
+ "Hallo Welt,Hello,2,2,2,2\n" +
+ "Hallo Welt,Hello world,2,2,2,3\n";
+ }
+
+ @Test
+ public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+ throws Exception {
+ /*
+ * Join on a tuple input with key field selector and a custom type input with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(1).equalTo(new KeySelector2())
+ .with(new T3CustJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hello\n" +
+ "Hello,Hello world\n" +
+ "Hello world,Hello world\n";
+ }
+
+ public static class KeySelector2 implements KeySelector<CustomType, Long> {
+ @Override
+ public Long getKey(CustomType value) {
+ return value.myLong;
+ }
+ }
+
+ @Test
+ public void testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
+ /*
+ * (Default) Join on two custom type inputs with key extractors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds1 = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+
+ DataSet<Tuple2<CustomType, CustomType>> joinDs =
+ ds1.join(ds2)
+ .where(
+ new KeySelector5()
+ )
.equalTo(
- new KeySelector<CustomType, Integer>() {
- @Override
- public Integer getKey(CustomType value) {
- return value.myInt;
- }
- }
- );
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,Hi,1,0,Hi\n" +
- "2,1,Hello,2,1,Hello\n" +
- "2,1,Hello,2,2,Hello world\n" +
- "2,2,Hello world,2,1,Hello\n" +
- "2,2,Hello world,2,2,Hello world\n";
-
- }
- case 14: {
- /*
- * UDF Join on tuples with tuple-returning key selectors
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.join(ds2)
- .where(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f1);
- }
- })
- .equalTo(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
- return new Tuple2<Integer, Long>(t.f0, t.f4);
- }
- })
- .with(new T3T5FlatJoin());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hallo\n" +
- "Hello,Hallo Welt\n" +
- "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" +
- "I am fine.,HIJ\n" +
- "I am fine.,IJK\n";
- }
- /**
- * Joins with POJOs
- */
- case 15: {
- /*
- * Join nested pojo against tuple (selected using a string)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
- ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
- "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
- "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
- }
-
- case 16: {
- /*
- * Join nested pojo against tuple (selected as an integer)
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
- ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
- "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
- "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
- }
- case 17: {
- /*
- * selecting multiple fields using expression language
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
- ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
- "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
- "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
- }
- case 18: {
- /*
- * nested into tuple
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
- ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
- "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
- "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
- }
- case 19: {
- /*
- * nested into tuple into pojo
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
- DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
- ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
- "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
- "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
- }
- case 20: {
- /*
- * Non-POJO test to verify that full-tuple keys are working.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
- DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
- DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
- ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "((1,1),one),((1,1),one)\n" +
- "((2,2),two),((2,2),two)\n" +
- "((3,3),three),((3,3),three)\n";
-
- }
- case 21: {
- /*
- * Non-POJO test to verify "nested" tuple-element selection.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
- DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
- DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
- ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "((1,1),one),((1,1),one)\n" +
- "((2,2),two),((2,2),two)\n" +
- "((3,3),three),((3,3),three)\n";
-
- }
- case 22: {
- /*
- * full pojo with full tuple
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
- DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
- DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String> >> joinDs =
- ds1.join(ds2).where("*").equalTo("*");
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
- "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
- "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
- }
- case 23: {
- /*
- * Non-POJO test to verify "nested" tuple-element selection with the first key field greater than 0.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>> ds2 = ds1.join(ds1).where(0).equalTo(0);
- DataSet<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> joinDs =
- ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
-
- joinDs.writeAsCsv(resultPath);
- env.setDegreeOfParallelism(1);
- env.execute();
-
- // return expected result
- return "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
- "((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n" +
- "((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
-
- }
- default:
- throw new IllegalArgumentException("Invalid program id: "+progId);
- }
-
+ new KeySelector6()
+ );
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,0,Hi,1,0,Hi\n" +
+ "2,1,Hello,2,1,Hello\n" +
+ "2,1,Hello,2,2,Hello world\n" +
+ "2,2,Hello world,2,1,Hello\n" +
+ "2,2,Hello world,2,2,Hello world\n";
+ }
+
+ public static class KeySelector5 implements KeySelector<CustomType, Integer> {
+ @Override
+ public Integer getKey(CustomType value) {
+ return value.myInt;
}
-
}
-
+
+ public static class KeySelector6 implements KeySelector<CustomType, Integer> {
+ @Override
+ public Integer getKey(CustomType value) {
+ return value.myInt;
+ }
+ }
+
+ @Test
+ public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception {
+ /*
+ * UDF Join on tuples with tuple-returning key selectors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(new KeySelector3())
+ .equalTo(new KeySelector4())
+ .with(new T3T5FlatJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" +
+ "I am fine.,HIJ\n" +
+ "I am fine.,IJK\n";
+ }
+
+ public static class KeySelector3 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }
+
+ public static class KeySelector4 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }
+
+ @Test
+ public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception {
+ /*
+ * Join nested pojo against tuple (selected using a string)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+ ds1.join(ds2).where("nestedPojo.longNumber").equalTo("f6");
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+ "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+ "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+ }
+
+ @Test
+ public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exception {
+ /*
+ * Join nested pojo against tuple (selected as an integer)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+ ds1.join(ds2).where("nestedPojo.longNumber").equalTo(6); // <--- difference!
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+ "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+ "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+ }
+
+ @Test
+ public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
+ /*
+ * selecting multiple fields using expression language
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+ ds1.join(ds2).where("nestedPojo.longNumber", "number", "str").equalTo("f6","f0","f1");
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+ "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+ "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+ }
+
+ @Test
+ public void testNestedIntoTuple() throws Exception {
+ /*
+ * nested into tuple
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+ ds1.join(ds2).where("nestedPojo.longNumber", "number","nestedTupleWithCustom.f0").equalTo("f6","f0","f2");
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+ "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+ "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+ }
+
+ @Test
+ public void testNestedIntoTupleIntoPojo() throws Exception {
+ /*
+ * nested into tuple into pojo
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
+ DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long> >> joinDs =
+ ds1.join(ds2).where("nestedTupleWithCustom.f0","nestedTupleWithCustom.f1.myInt","nestedTupleWithCustom.f1.myLong").equalTo("f2","f3","f4");
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
+ "2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
+ "3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
+ }
+
+ @Test
+ public void testNonPojoToVerifyFullTupleKeys() throws Exception {
+ /*
+ * Non-POJO test to verify that full-tuple keys are working.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+ DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+ DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
+ ds1.join(ds2).where(0).equalTo("f0.f0", "f0.f1"); // key is now Tuple2<Integer, Integer>
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "((1,1),one),((1,1),one)\n" +
+ "((2,2),two),((2,2),two)\n" +
+ "((3,3),three),((3,3),three)\n";
+
+ }
+
+ @Test
+ public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception {
+ /*
+ * Non-POJO test to verify "nested" tuple-element selection.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+ DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
+ DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String> >> joinDs =
+ ds1.join(ds2).where("f0.f0").equalTo("f0.f0"); // key is now Integer from Tuple2<Integer, Integer>
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "((1,1),one),((1,1),one)\n" +
+ "((2,2),two),((2,2),two)\n" +
+ "((3,3),three),((3,3),three)\n";
+ }
+
+ @Test
+ public void testFullPojoWithFullTuple() throws Exception {
+ /*
+ * full pojo with full tuple
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
+ DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
+ DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String> >> joinDs =
+ ds1.join(ds2).where("*").equalTo("*");
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n"+
+ "2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
+ "3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
+ }
+
+ @Test
+ public void testNonPojoToVerifyNestedTupleElementSelectionWithFirstKeyFieldGreaterThanZero()
+ throws Exception {
+ /*
+ * Non-POJO test to verify "nested" tuple-element selection with the first key field greater than 0.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>> ds2 = ds1.join(ds1).where(0).equalTo(0);
+ DataSet<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> joinDs =
+ ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
+
+ joinDs.writeAsCsv(resultPath);
+ env.setDegreeOfParallelism(1);
+ env.execute();
+
+ expected = "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
+ "((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n" +
+ "((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
+ }
+
public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index f26a1e7..4fbb53d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -18,11 +18,10 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.Collection;
-import java.util.LinkedList;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
import org.junit.Assert;
import org.apache.flink.api.common.functions.MapFunction;
@@ -31,461 +30,461 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
-public class MapITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 9;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class MapITCase extends MultipleProgramsTestBase {
+
+ public MapITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public MapITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = MapProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testIdentityMapWithBasicType() throws Exception {
+ /*
+ * Test identity map with basic type
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+ DataSet<String> identityMapDs = ds.
+ map(new Mapper1());
+
+ identityMapDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "Hi\n" +
+ "Hello\n" +
+ "Hello world\n" +
+ "Hello world, how are you?\n" +
+ "I am fine.\n" +
+ "Luke Skywalker\n" +
+ "Random comment\n" +
+ "LOL\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ public static class Mapper1 implements MapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return value;
+ }
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ @Test
+ public void testIdentityMapWithTuple() throws Exception {
+ /*
+ * Test identity map with a tuple
+ */
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
+ map(new Mapper2());
+
+ identityMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" +
+ "5,3,I am fine.\n" +
+ "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" +
+ "8,4,Comment#2\n" +
+ "9,4,Comment#3\n" +
+ "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" +
+ "12,5,Comment#6\n" +
+ "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" +
+ "15,5,Comment#9\n" +
+ "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" +
+ "18,6,Comment#12\n" +
+ "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" +
+ "21,6,Comment#15\n";
+ }
+
+ public static class Mapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
+ throws Exception {
+ return value;
}
-
- return toParameterList(tConfigs);
}
-
- private static class MapProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
- /*
- * Test identity map with basic type
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
- DataSet<String> identityMapDs = ds.
- map(new MapFunction<String, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(String value) throws Exception {
- return value;
- }
- });
-
- identityMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "Hi\n" +
- "Hello\n" +
- "Hello world\n" +
- "Hello world, how are you?\n" +
- "I am fine.\n" +
- "Luke Skywalker\n" +
- "Random comment\n" +
- "LOL\n";
- }
- case 2: {
- /*
- * Test identity map with a tuple
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
- map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
- throws Exception {
- return value;
- }
- });
-
- identityMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "2,2,Hello\n" +
- "3,2,Hello world\n" +
- "4,3,Hello world, how are you?\n" +
- "5,3,I am fine.\n" +
- "6,3,Luke Skywalker\n" +
- "7,4,Comment#1\n" +
- "8,4,Comment#2\n" +
- "9,4,Comment#3\n" +
- "10,4,Comment#4\n" +
- "11,5,Comment#5\n" +
- "12,5,Comment#6\n" +
- "13,5,Comment#7\n" +
- "14,5,Comment#8\n" +
- "15,5,Comment#9\n" +
- "16,6,Comment#10\n" +
- "17,6,Comment#11\n" +
- "18,6,Comment#12\n" +
- "19,6,Comment#13\n" +
- "20,6,Comment#14\n" +
- "21,6,Comment#15\n";
- }
- case 3: {
- /*
- * Test type conversion mapper (Custom -> Tuple)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
- map(new MapFunction<CustomType, Tuple3<Integer, Long, String>>() {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-
- @Override
- public Tuple3<Integer, Long, String> map(CustomType value) throws Exception {
- out.setField(value.myInt, 0);
- out.setField(value.myLong, 1);
- out.setField(value.myString, 2);
- return out;
- }
- });
-
- typeConversionMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,Hi\n" +
- "2,1,Hello\n" +
- "2,2,Hello world\n" +
- "3,3,Hello world, how are you?\n" +
- "3,4,I am fine.\n" +
- "3,5,Luke Skywalker\n" +
- "4,6,Comment#1\n" +
- "4,7,Comment#2\n" +
- "4,8,Comment#3\n" +
- "4,9,Comment#4\n" +
- "5,10,Comment#5\n" +
- "5,11,Comment#6\n" +
- "5,12,Comment#7\n" +
- "5,13,Comment#8\n" +
- "5,14,Comment#9\n" +
- "6,15,Comment#10\n" +
- "6,16,Comment#11\n" +
- "6,17,Comment#12\n" +
- "6,18,Comment#13\n" +
- "6,19,Comment#14\n" +
- "6,20,Comment#15\n";
- }
- case 4: {
- /*
- * Test type conversion mapper (Tuple -> Basic)
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<String> typeConversionMapDs = ds.
- map(new MapFunction<Tuple3<Integer, Long, String>, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Tuple3<Integer, Long, String> value) throws Exception {
- return value.getField(2);
- }
- });
-
- typeConversionMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "Hi\n" + "Hello\n" + "Hello world\n" +
- "Hello world, how are you?\n" +
- "I am fine.\n" + "Luke Skywalker\n" +
- "Comment#1\n" + "Comment#2\n" +
- "Comment#3\n" + "Comment#4\n" +
- "Comment#5\n" + "Comment#6\n" +
- "Comment#7\n" + "Comment#8\n" +
- "Comment#9\n" + "Comment#10\n" +
- "Comment#11\n" + "Comment#12\n" +
- "Comment#13\n" + "Comment#14\n" +
- "Comment#15\n";
- }
- case 5: {
- /*
- * Test mapper on tuple - Increment Integer field, reorder second and third fields
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
- map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>>() {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, String, Long> out = new Tuple3<Integer, String, Long>();
-
- @Override
- public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value)
- throws Exception {
- Integer incr = Integer.valueOf(value.f0.intValue() + 1);
- out.setFields(incr, value.f2, value.f1);
- return out;
- }
- });
-
- tupleMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "2,Hi,1\n" +
- "3,Hello,2\n" +
- "4,Hello world,2\n" +
- "5,Hello world, how are you?,3\n" +
- "6,I am fine.,3\n" +
- "7,Luke Skywalker,3\n" +
- "8,Comment#1,4\n" +
- "9,Comment#2,4\n" +
- "10,Comment#3,4\n" +
- "11,Comment#4,4\n" +
- "12,Comment#5,5\n" +
- "13,Comment#6,5\n" +
- "14,Comment#7,5\n" +
- "15,Comment#8,5\n" +
- "16,Comment#9,5\n" +
- "17,Comment#10,6\n" +
- "18,Comment#11,6\n" +
- "19,Comment#12,6\n" +
- "20,Comment#13,6\n" +
- "21,Comment#14,6\n" +
- "22,Comment#15,6\n";
- }
- case 6: {
- /*
- * Test mapper on Custom - lowercase myString
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> customMapDs = ds.
- map(new MapFunction<CustomType, CustomType>() {
- private static final long serialVersionUID = 1L;
- private final CustomType out = new CustomType();
-
- @Override
- public CustomType map(CustomType value) throws Exception {
- out.myInt = value.myInt;
- out.myLong = value.myLong;
- out.myString = value.myString.toLowerCase();
- return out;
- }
- });
-
- customMapDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,hi\n" +
- "2,1,hello\n" +
- "2,2,hello world\n" +
- "3,3,hello world, how are you?\n" +
- "3,4,i am fine.\n" +
- "3,5,luke skywalker\n" +
- "4,6,comment#1\n" +
- "4,7,comment#2\n" +
- "4,8,comment#3\n" +
- "4,9,comment#4\n" +
- "5,10,comment#5\n" +
- "5,11,comment#6\n" +
- "5,12,comment#7\n" +
- "5,13,comment#8\n" +
- "5,14,comment#9\n" +
- "6,15,comment#10\n" +
- "6,16,comment#11\n" +
- "6,17,comment#12\n" +
- "6,18,comment#13\n" +
- "6,19,comment#14\n" +
- "6,20,comment#15\n";
- }
- case 7: {
- /*
- * Test mapper if UDF returns input object - increment first field of a tuple
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
- map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
- throws Exception {
- Integer incr = Integer.valueOf(value.f0.intValue() + 1);
- value.setField(incr, 0);
- return value;
- }
- });
-
- inputObjMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "2,1,Hi\n" +
- "3,2,Hello\n" +
- "4,2,Hello world\n" +
- "5,3,Hello world, how are you?\n" +
- "6,3,I am fine.\n" +
- "7,3,Luke Skywalker\n" +
- "8,4,Comment#1\n" +
- "9,4,Comment#2\n" +
- "10,4,Comment#3\n" +
- "11,4,Comment#4\n" +
- "12,5,Comment#5\n" +
- "13,5,Comment#6\n" +
- "14,5,Comment#7\n" +
- "15,5,Comment#8\n" +
- "16,5,Comment#9\n" +
- "17,6,Comment#10\n" +
- "18,6,Comment#11\n" +
- "19,6,Comment#12\n" +
- "20,6,Comment#13\n" +
- "21,6,Comment#14\n" +
- "22,6,Comment#15\n";
- }
- case 8: {
- /*
- * Test map with broadcast set
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
- map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
- private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
- private Integer f2Replace = 0;
-
- @Override
- public void open(Configuration config) {
- Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
- int sum = 0;
- for(Integer i : ints) {
- sum += i;
- }
- f2Replace = sum;
- }
-
- @Override
- public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
- throws Exception {
- out.setFields(f2Replace, value.f1, value.f2);
- return out;
- }
- }).withBroadcastSet(ints, "ints");
- bcMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "55,1,Hi\n" +
- "55,2,Hello\n" +
- "55,2,Hello world\n" +
- "55,3,Hello world, how are you?\n" +
- "55,3,I am fine.\n" +
- "55,3,Luke Skywalker\n" +
- "55,4,Comment#1\n" +
- "55,4,Comment#2\n" +
- "55,4,Comment#3\n" +
- "55,4,Comment#4\n" +
- "55,5,Comment#5\n" +
- "55,5,Comment#6\n" +
- "55,5,Comment#7\n" +
- "55,5,Comment#8\n" +
- "55,5,Comment#9\n" +
- "55,6,Comment#10\n" +
- "55,6,Comment#11\n" +
- "55,6,Comment#12\n" +
- "55,6,Comment#13\n" +
- "55,6,Comment#14\n" +
- "55,6,Comment#15\n";
- }
- case 9: {
- /*
- * Test passing configuration object.
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- Configuration conf = new Configuration();
- final String testKey = "testVariable";
- final int testValue = 666;
- conf.setInteger(testKey, testValue);
- DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
- map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void open(Configuration config) {
- int val = config.getInteger(testKey, -1);
- Assert.assertEquals(testValue, val);
- }
-
- @Override
- public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) {
- return value;
- }
- }).withParameters(conf);
- bcMapDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n"
- + "2,2,Hello\n"
- + "3,2,Hello world";
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
+
+ @Test
+ public void testTypeConversionMapperCustomToTuple() throws Exception {
+ /*
+ * Test type conversion mapper (Custom -> Tuple)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
+ map(new Mapper3());
+
+ typeConversionMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,0,Hi\n" +
+ "2,1,Hello\n" +
+ "2,2,Hello world\n" +
+ "3,3,Hello world, how are you?\n" +
+ "3,4,I am fine.\n" +
+ "3,5,Luke Skywalker\n" +
+ "4,6,Comment#1\n" +
+ "4,7,Comment#2\n" +
+ "4,8,Comment#3\n" +
+ "4,9,Comment#4\n" +
+ "5,10,Comment#5\n" +
+ "5,11,Comment#6\n" +
+ "5,12,Comment#7\n" +
+ "5,13,Comment#8\n" +
+ "5,14,Comment#9\n" +
+ "6,15,Comment#10\n" +
+ "6,16,Comment#11\n" +
+ "6,17,Comment#12\n" +
+ "6,18,Comment#13\n" +
+ "6,19,Comment#14\n" +
+ "6,20,Comment#15\n";
+ }
+
+ public static class Mapper3 implements MapFunction<CustomType, Tuple3<Integer, Long, String>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
+
+ @Override
+ public Tuple3<Integer, Long, String> map(CustomType value) throws Exception {
+ out.setField(value.myInt, 0);
+ out.setField(value.myLong, 1);
+ out.setField(value.myString, 2);
+ return out;
+ }
+ }
+
+ @Test
+ public void testTypeConversionMapperTupleToBasic() throws Exception {
+ /*
+ * Test type conversion mapper (Tuple -> Basic)
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<String> typeConversionMapDs = ds.
+ map(new Mapper4());
+
+ typeConversionMapDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+ "Hello world, how are you?\n" +
+ "I am fine.\n" + "Luke Skywalker\n" +
+ "Comment#1\n" + "Comment#2\n" +
+ "Comment#3\n" + "Comment#4\n" +
+ "Comment#5\n" + "Comment#6\n" +
+ "Comment#7\n" + "Comment#8\n" +
+ "Comment#9\n" + "Comment#10\n" +
+ "Comment#11\n" + "Comment#12\n" +
+ "Comment#13\n" + "Comment#14\n" +
+ "Comment#15\n";
+ }
+
+ public static class Mapper4 implements MapFunction<Tuple3<Integer, Long, String>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map(Tuple3<Integer, Long, String> value) throws Exception {
+ return value.getField(2);
+ }
+ }
+
+ @Test
+ public void testMapperOnTupleIncrementIntegerFieldReorderSecondAndThirdFields() throws
+ Exception {
+ /*
+ * Test mapper on tuple - Increment Integer field, reorder second and third fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
+ map(new Mapper5());
+
+ tupleMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "2,Hi,1\n" +
+ "3,Hello,2\n" +
+ "4,Hello world,2\n" +
+ "5,Hello world, how are you?,3\n" +
+ "6,I am fine.,3\n" +
+ "7,Luke Skywalker,3\n" +
+ "8,Comment#1,4\n" +
+ "9,Comment#2,4\n" +
+ "10,Comment#3,4\n" +
+ "11,Comment#4,4\n" +
+ "12,Comment#5,5\n" +
+ "13,Comment#6,5\n" +
+ "14,Comment#7,5\n" +
+ "15,Comment#8,5\n" +
+ "16,Comment#9,5\n" +
+ "17,Comment#10,6\n" +
+ "18,Comment#11,6\n" +
+ "19,Comment#12,6\n" +
+ "20,Comment#13,6\n" +
+ "21,Comment#14,6\n" +
+ "22,Comment#15,6\n";
+ }
+
+ public static class Mapper5 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple3<Integer, String, Long> out = new Tuple3<Integer, String, Long>();
+
+ @Override
+ public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value)
+ throws Exception {
+ Integer incr = Integer.valueOf(value.f0.intValue() + 1);
+ out.setFields(incr, value.f2, value.f1);
+ return out;
+ }
+ }
+
+ @Test
+ public void testMapperOnCustomLowercaseString() throws Exception {
+ /*
+ * Test mapper on Custom - lowercase myString
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> customMapDs = ds.
+ map(new Mapper6());
+
+ customMapDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1,0,hi\n" +
+ "2,1,hello\n" +
+ "2,2,hello world\n" +
+ "3,3,hello world, how are you?\n" +
+ "3,4,i am fine.\n" +
+ "3,5,luke skywalker\n" +
+ "4,6,comment#1\n" +
+ "4,7,comment#2\n" +
+ "4,8,comment#3\n" +
+ "4,9,comment#4\n" +
+ "5,10,comment#5\n" +
+ "5,11,comment#6\n" +
+ "5,12,comment#7\n" +
+ "5,13,comment#8\n" +
+ "5,14,comment#9\n" +
+ "6,15,comment#10\n" +
+ "6,16,comment#11\n" +
+ "6,17,comment#12\n" +
+ "6,18,comment#13\n" +
+ "6,19,comment#14\n" +
+ "6,20,comment#15\n";
+ }
+
+ public static class Mapper6 implements MapFunction<CustomType, CustomType> {
+ private static final long serialVersionUID = 1L;
+ private final CustomType out = new CustomType();
+
+ @Override
+ public CustomType map(CustomType value) throws Exception {
+ out.myInt = value.myInt;
+ out.myLong = value.myLong;
+ out.myString = value.myString.toLowerCase();
+ return out;
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ /*
+ * Test mapper if UDF returns input object - increment first field of a tuple
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
+ map(new Mapper7());
+
+ inputObjMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "2,1,Hi\n" +
+ "3,2,Hello\n" +
+ "4,2,Hello world\n" +
+ "5,3,Hello world, how are you?\n" +
+ "6,3,I am fine.\n" +
+ "7,3,Luke Skywalker\n" +
+ "8,4,Comment#1\n" +
+ "9,4,Comment#2\n" +
+ "10,4,Comment#3\n" +
+ "11,4,Comment#4\n" +
+ "12,5,Comment#5\n" +
+ "13,5,Comment#6\n" +
+ "14,5,Comment#7\n" +
+ "15,5,Comment#8\n" +
+ "16,5,Comment#9\n" +
+ "17,6,Comment#10\n" +
+ "18,6,Comment#11\n" +
+ "19,6,Comment#12\n" +
+ "20,6,Comment#13\n" +
+ "21,6,Comment#14\n" +
+ "22,6,Comment#15\n";
+ }
+
+ public static class Mapper7 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
+ throws Exception {
+ Integer incr = Integer.valueOf(value.f0.intValue() + 1);
+ value.setField(incr, 0);
+ return value;
+ }
+ }
+
+ @Test
+ public void testMapWithBroadcastSet() throws Exception {
+ /*
+ * Test map with broadcast set
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
+ map(new RichMapper1()).withBroadcastSet(ints, "ints");
+ bcMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "55,1,Hi\n" +
+ "55,2,Hello\n" +
+ "55,2,Hello world\n" +
+ "55,3,Hello world, how are you?\n" +
+ "55,3,I am fine.\n" +
+ "55,3,Luke Skywalker\n" +
+ "55,4,Comment#1\n" +
+ "55,4,Comment#2\n" +
+ "55,4,Comment#3\n" +
+ "55,4,Comment#4\n" +
+ "55,5,Comment#5\n" +
+ "55,5,Comment#6\n" +
+ "55,5,Comment#7\n" +
+ "55,5,Comment#8\n" +
+ "55,5,Comment#9\n" +
+ "55,6,Comment#10\n" +
+ "55,6,Comment#11\n" +
+ "55,6,Comment#12\n" +
+ "55,6,Comment#13\n" +
+ "55,6,Comment#14\n" +
+ "55,6,Comment#15\n";
+ }
+
+ public static class RichMapper1 extends RichMapFunction<Tuple3<Integer,Long,String>,
+ Tuple3<Integer, Long,String>> {
+ private static final long serialVersionUID = 1L;
+ private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
+ private Integer f2Replace = 0;
+
+ @Override
+ public void open(Configuration config) {
+ Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
+ int sum = 0;
+ for(Integer i : ints) {
+ sum += i;
}
-
+ f2Replace = sum;
+ }
+
+ @Override
+ public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
+ throws Exception {
+ out.setFields(f2Replace, value.f1, value.f2);
+ return out;
+ }
+ }
+
+ static final String testKey = "testVariable";
+ static final int testValue = 666;
+
+ @Test
+ public void testPassingConfigurationObject() throws Exception {
+ /*
+ * Test passing configuration object.
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ Configuration conf = new Configuration();
+ conf.setInteger(testKey, testValue);
+ DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
+ map(new RichMapper2()).withParameters(conf);
+ bcMapDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,1,Hi\n"
+ + "2,2,Hello\n"
+ + "3,2,Hello world";
+ }
+
+ public static class RichMapper2 extends RichMapFunction<Tuple3<Integer,Long,String>,
+ Tuple3<Integer, Long,String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void open(Configuration config) {
+ int val = config.getInteger(testKey, -1);
+ Assert.assertEquals(testValue, val);
+ }
+
+ @Override
+ public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) {
+ return value;
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a0f77857/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
index d3c87fa..cf78a34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
@@ -18,11 +18,7 @@
package org.apache.flink.test.javaApiOperators;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -34,227 +30,212 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
-public class PartitionITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 4;
-
- private int curProgId = config.getInteger("ProgramId", -1);
+public class PartitionITCase extends MultipleProgramsTestBase {
+
+ public PartitionITCase(MultipleProgramsTestBase.ExecutionMode mode){
+ super(mode);
+ }
+
private String resultPath;
- private String expectedResult;
-
- public PartitionITCase(Configuration config) {
- super(config);
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
}
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
}
- @Override
- protected void testProgram() throws Exception {
- expectedResult = PartitionProgs.runProgram(curProgId, resultPath);
+ @Test
+ public void testHashPartitionByKeyField() throws Exception {
+ /*
+ * Test hash partition by key field
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Long> uniqLongs = ds
+ .partitionByHash(1)
+ .mapPartition(new UniqueLongMapper());
+ uniqLongs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n" +
+ "3\n" +
+ "4\n" +
+ "5\n" +
+ "6\n";
}
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(expectedResult, resultPath);
+
+ @Test
+ public void testHashPartitionByKeySelector() throws Exception {
+ /*
+ * Test hash partition by key selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Long> uniqLongs = ds
+ .partitionByHash(new KeySelector1())
+ .mapPartition(new UniqueLongMapper());
+ uniqLongs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n" +
+ "3\n" +
+ "4\n" +
+ "5\n" +
+ "6\n";
}
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+ public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
+ private static final long serialVersionUID = 1L;
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
+ @Override
+ public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
+ return value.f1;
}
-
- return toParameterList(tConfigs);
+
}
-
- private static class PartitionProgs {
-
- public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 0: {
- /*
- * Test hash partition by key field
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Long> uniqLongs = ds
- .partitionByHash(1)
- .mapPartition(new UniqueLongMapper());
- uniqLongs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n" +
- "3\n" +
- "4\n" +
- "5\n" +
- "6\n";
- }
- case 1: {
- /*
- * Test hash partition by key selector
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Long> uniqLongs = ds
- .partitionByHash(new KeySelector<Tuple3<Integer,Long,String>, Long>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
- return value.f1;
- }
-
- })
- .mapPartition(new UniqueLongMapper());
- uniqLongs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n" +
- "3\n" +
- "4\n" +
- "5\n" +
- "6\n";
- }
- case 2: {
- /*
- * Test forced rebalancing
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // generate some number in parallel
- DataSet<Long> ds = env.generateSequence(1,3000);
- DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
- // introduce some partition skew by filtering
- .filter(new FilterFunction<Long>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Long value) throws Exception {
- if (value <= 780) {
- return false;
- } else {
- return true;
- }
- }
- })
+
+ @Test
+ public void testForcedRebalancing() throws Exception {
+ /*
+ * Test forced rebalancing
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // generate some number in parallel
+ DataSet<Long> ds = env.generateSequence(1,3000);
+ DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
+ // introduce some partition skew by filtering
+ .filter(new Filter1())
// rebalance
- .rebalance()
+ .rebalance()
// count values in each partition
- .map(new PartitionIndexMapper())
- .groupBy(0)
- .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
- return new Tuple2<Integer, Integer>(v1.f0, v1.f1+v2.f1);
- }
- })
+ .map(new PartitionIndexMapper())
+ .groupBy(0)
+ .reduce(new Reducer1())
// round counts to mitigate runtime scheduling effects (lazy split assignment)
- .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
- value.f1 = (value.f1 / 10);
- return value;
- }
-
- });
-
- uniqLongs.writeAsText(resultPath);
-
- env.execute();
-
- StringBuilder result = new StringBuilder();
- int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
- for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
- result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
- }
- // return expected result
- return result.toString();
- }
- case 3: {
- /*
- * Test hash partition by key field and different DOP
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Long> uniqLongs = ds
- .partitionByHash(1).setParallelism(4)
- .mapPartition(new UniqueLongMapper());
- uniqLongs.writeAsText(resultPath);
-
- env.execute();
-
- // return expected result
- return "1\n" +
- "2\n" +
- "3\n" +
- "4\n" +
- "5\n" +
- "6\n";
- }
- case 4: {
- /*
- * Test hash partition with key expression
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(3);
-
- DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
- DataSet<Long> uniqLongs = ds
- .partitionByHash("nestedPojo.longNumber").setParallelism(4)
- .mapPartition(new UniqueNestedPojoLongMapper());
- uniqLongs.writeAsText(resultPath);
-
- env.execute();
-
- // return expected result
- return "10000\n" +
- "20000\n" +
- "30000\n";
- }
-
-
-
- default:
- throw new IllegalArgumentException("Invalid program id");
+ .map(new Mapper1());
+
+ uniqLongs.writeAsText(resultPath);
+
+ env.execute();
+
+ StringBuilder result = new StringBuilder();
+ int numPerPartition = 2220 / env.getDegreeOfParallelism() / 10;
+ for (int i = 0; i < env.getDegreeOfParallelism(); i++) {
+ result.append('(').append(i).append(',').append(numPerPartition).append(")\n");
+ }
+
+ expected = result.toString();
+ }
+
+ public static class Filter1 implements FilterFunction<Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Long value) throws Exception {
+ if (value <= 780) {
+ return false;
+ } else {
+ return true;
}
}
}
-
+
+ public static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
+ return new Tuple2<Integer, Integer>(v1.f0, v1.f1+v2.f1);
+ }
+ }
+
+ public static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
+ Integer>>{
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
+ value.f1 = (value.f1 / 10);
+ return value;
+ }
+
+ }
+
+ @Test
+ public void testHashPartitionByKeyFieldAndDifferentDOP() throws Exception {
+ /*
+ * Test hash partition by key field and different DOP
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(3);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Long> uniqLongs = ds
+ .partitionByHash(1).setParallelism(4)
+ .mapPartition(new UniqueLongMapper());
+ uniqLongs.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "1\n" +
+ "2\n" +
+ "3\n" +
+ "4\n" +
+ "5\n" +
+ "6\n";
+ }
+
+ @Test
+ public void testHashPartitionWithKeyExpression() throws Exception {
+ /*
+ * Test hash partition with key expression
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(3);
+
+ DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+ DataSet<Long> uniqLongs = ds
+ .partitionByHash("nestedPojo.longNumber").setParallelism(4)
+ .mapPartition(new UniqueNestedPojoLongMapper());
+ uniqLongs.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "10000\n" +
+ "20000\n" +
+ "30000\n";
+ }
+
public static class UniqueLongMapper implements MapPartitionFunction<Tuple3<Integer,Long,String>, Long> {
private static final long serialVersionUID = 1L;