You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:22 UTC
[06/22] [FLINK-701] Refactor Java API to use SAM interfaces.
Introduce RichFunction stubs for all UDFs.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index 2f749d4..4c8177a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -147,8 +147,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
}
}
- public static final class NeighborWithComponentIDJoin extends JoinFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -161,8 +160,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
}
}
- public static final class MinimumReduce extends GroupReduceFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@@ -189,8 +187,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
}
@SuppressWarnings("serial")
- public static final class MinimumIdFilter extends FlatMapFunction
- <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+ public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
private static LongSumAggregatorWithParameter aggr;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index fa1676f..104c3df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -25,9 +25,9 @@ import java.util.List;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -139,8 +139,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
}
}
- public static final class NeighborWithComponentIDJoin extends JoinFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
@@ -153,8 +152,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
}
}
- public static final class MinimumReduce extends GroupReduceFunction
- <Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private static final long serialVersionUID = 1L;
final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@@ -181,8 +179,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
}
@SuppressWarnings("serial")
- public static final class MinimumIdFilter extends FlatMapFunction
- <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+ public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
private static LongSumAggregator aggr;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
index 0475a4f..1ec0eb4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
import java.util.Iterator;
import java.util.Set;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.ConfigUtils;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
@@ -32,7 +32,7 @@ import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
import org.apache.flink.util.Collector;
-public class CustomCompensatableDotProductCoGroup extends AbstractFunction implements GenericCoGrouper<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
+public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
index 28c77ba..b44d914 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
import java.util.Random;
import java.util.Set;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.ConfigUtils;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
@@ -31,8 +31,8 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
import org.apache.flink.util.Collector;
-public class CustomCompensatableDotProductMatch extends AbstractFunction implements
- GenericJoiner<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank>
+public class CustomCompensatableDotProductMatch extends AbstractRichFunction implements
+ FlatJoinFunction<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
index 74426c0..d83b33b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
import java.util.Set;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GenericCollectorMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.iterative.nephele.ConfigUtils;
@@ -29,7 +29,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
import org.apache.flink.util.Collector;
-public class CustomCompensatingMap extends AbstractFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
+public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 8af9247..1e08a9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -20,15 +20,15 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
import org.apache.flink.util.Collector;
-public class CustomRankCombiner extends AbstractFunction implements GenericGroupReduce<VertexWithRank, VertexWithRank>,
- GenericCombine<VertexWithRank>
+public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
+ FlatCombineFunction<VertexWithRank>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index b914c1c..3749c1d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -24,8 +24,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -301,7 +302,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
- public static class Tuple5CoGroup extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
+ public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
@@ -330,7 +331,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
}
- public static class CustomTypeCoGroup extends CoGroupFunction<CustomType, CustomType, CustomType> {
+ public static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@@ -358,7 +359,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
- public static class MixedCoGroup extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+ public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@@ -388,7 +389,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
- public static class MixedCoGroup2 extends CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
+ public static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
private static final long serialVersionUID = 1L;
@@ -417,7 +418,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
- public static class Tuple3ReturnLeft extends CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ public static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@@ -434,7 +435,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
}
- public static class Tuple5ReturnRight extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+ public static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
@@ -456,7 +457,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
}
- public static class Tuple5CoGroupBC extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+ public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index dabe7fc..304dda2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.CrossFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.java.functions.RichCrossFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -400,7 +401,7 @@ public class CrossITCase extends JavaProgramTestBase {
}
- public static class Tuple5Cross extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
+ public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@@ -416,7 +417,7 @@ public class CrossITCase extends JavaProgramTestBase {
}
- public static class CustomTypeCross extends CrossFunction<CustomType, CustomType, CustomType> {
+ public static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@@ -429,7 +430,7 @@ public class CrossITCase extends JavaProgramTestBase {
}
- public static class MixedCross extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+ public static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@@ -444,7 +445,7 @@ public class CrossITCase extends JavaProgramTestBase {
}
- public static class Tuple3ReturnLeft extends CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
+ public static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@@ -457,7 +458,7 @@ public class CrossITCase extends JavaProgramTestBase {
}
}
- public static class Tuple5ReturnRight extends CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+ public static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
@@ -473,7 +474,7 @@ public class CrossITCase extends JavaProgramTestBase {
}
- public static class Tuple5CrossBC extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+ public static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 203117c..0c6f3cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -164,7 +164,7 @@ public class DistinctITCase extends JavaProgramTestBase {
return in.myInt;
}
})
- .map(new MapFunction<CollectionDataSets.CustomType, Tuple1<Integer>>() {
+ .map(new RichMapFunction<CustomType, Tuple1<Integer>>() {
@Override
public Tuple1<Integer> map(CustomType value) throws Exception {
return new Tuple1<Integer>(value.myInt);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
index 96174da..6613bc1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -38,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class FilterITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 8;
+ private static int NUM_PROGRAMS = 8;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -268,7 +269,7 @@ public class FilterITCase extends JavaProgramTestBase {
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
+ filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
int literal = -1;
@@ -306,7 +307,7 @@ public class FilterITCase extends JavaProgramTestBase {
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
+ filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
private int broadcastSum = 0;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
index 1c97347..a6dd377 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -323,7 +324,7 @@ public class FlatMapITCase extends JavaProgramTestBase {
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
- flatMap(new FlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+ flatMap(new RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> outTuple =
new Tuple3<Integer, Long, String>();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 6556b5e..7376e86 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -24,10 +24,11 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -89,332 +90,336 @@ public class GroupReduceITCase extends JavaProgramTestBase {
private static class GroupReduceProgs {
public static String runProgram(int progId, String resultPath) throws Exception {
-
- switch(progId) {
- case 1: {
+
+ switch (progId) {
+ case 1: {
/*
* check correctness of groupReduce on tuples with key field selector
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> reduceDs = ds.
- groupBy(1).reduceGroup(new Tuple3GroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1\n" +
- "5,2\n" +
- "15,3\n" +
- "34,4\n" +
- "65,5\n" +
- "111,6\n";
- }
- case 2: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> reduceDs = ds.
+ groupBy(1).reduceGroup(new Tuple3GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1\n" +
+ "5,2\n" +
+ "15,3\n" +
+ "34,4\n" +
+ "65,5\n" +
+ "111,6\n";
+ }
+ case 2: {
/*
* check correctness of groupReduce on tuples with multiple key field selector
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
- DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
- groupBy(4,0).reduceGroup(new Tuple5GroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,0,P-),1\n" +
- "2,3,0,P-),1\n" +
- "2,2,0,P-),2\n" +
- "3,9,0,P-),2\n" +
- "3,6,0,P-),3\n" +
- "4,17,0,P-),1\n" +
- "4,17,0,P-),2\n" +
- "5,11,0,P-),1\n" +
- "5,29,0,P-),2\n" +
- "5,25,0,P-),3\n";
- }
- case 3: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+ groupBy(4, 0).reduceGroup(new Tuple5GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,0,P-),1\n" +
+ "2,3,0,P-),1\n" +
+ "2,2,0,P-),2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,0,P-),3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,0,P-),1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
+ case 3: {
/*
* check correctness of groupReduce on tuples with key field selector and group sorting
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).sortGroup(2,Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "5,2,Hello-Hello world\n" +
- "15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" +
- "34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
- "65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
- "111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
-
- }
- case 4: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).sortGroup(2, Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,Hi\n" +
+ "5,2,Hello-Hello world\n" +
+ "15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" +
+ "34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
+ "65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
+ "111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
+
+ }
+ case 4: {
/*
* check correctness of groupReduce on tuples with key extractor
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, Long>> reduceDs = ds.
- groupBy(new KeySelector<Tuple3<Integer,Long,String>, Long>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Long getKey(Tuple3<Integer, Long, String> in) {
- return in.f1;
- }
- }).reduceGroup(new Tuple3GroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1\n" +
- "5,2\n" +
- "15,3\n" +
- "34,4\n" +
- "65,5\n" +
- "111,6\n";
-
- }
- case 5: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> reduceDs = ds.
+ groupBy(new KeySelector<Tuple3<Integer, Long, String>, Long>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long getKey(Tuple3<Integer, Long, String> in) {
+ return in.f1;
+ }
+ }).reduceGroup(new Tuple3GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1\n" +
+ "5,2\n" +
+ "15,3\n" +
+ "34,4\n" +
+ "65,5\n" +
+ "111,6\n";
+
+ }
+ case 5: {
/*
* check correctness of groupReduce on custom type with type extractor
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.
- groupBy(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).reduceGroup(new CustomTypeGroupReduce());
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,Hello!\n" +
- "2,3,Hello!\n" +
- "3,12,Hello!\n" +
- "4,30,Hello!\n" +
- "5,60,Hello!\n" +
- "6,105,Hello!\n";
- }
- case 6: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> reduceDs = ds.
+ groupBy(new KeySelector<CustomType, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }).reduceGroup(new CustomTypeGroupReduce());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,0,Hello!\n" +
+ "2,3,Hello!\n" +
+ "3,12,Hello!\n" +
+ "4,30,Hello!\n" +
+ "5,60,Hello!\n" +
+ "6,105,Hello!\n";
+ }
+ case 6: {
/*
* check correctness of all-groupreduce for tuples
*/
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "231,91,Hello World\n";
- }
- case 7: {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "231,91,Hello World\n";
+ }
+ case 7: {
/*
* check correctness of all-groupreduce for custom types
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "91,210,Hello!";
- }
- case 8: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "91,210,Hello!";
+ }
+ case 8: {
/*
* check correctness of groupReduce with broadcast set
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,55\n" +
- "5,2,55\n" +
- "15,3,55\n" +
- "34,4,55\n" +
- "65,5,55\n" +
- "111,6,55\n";
- }
- case 9: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,55\n" +
+ "5,2,55\n" +
+ "15,3,55\n" +
+ "34,4,55\n" +
+ "65,5,55\n" +
+ "111,6,55\n";
+ }
+ case 9: {
/*
* check correctness of groupReduce if UDF returns input objects multiple times and changes it in between
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "11,1,Hi!\n" +
- "21,1,Hi again!\n" +
- "12,2,Hi!\n" +
- "22,2,Hi again!\n" +
- "13,2,Hi!\n" +
- "23,2,Hi again!\n";
- }
- case 10: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "11,1,Hi!\n" +
+ "21,1,Hi again!\n" +
+ "12,2,Hi!\n" +
+ "22,2,Hi again!\n" +
+ "13,2,Hi!\n" +
+ "23,2,Hi again!\n";
+ }
+ case 10: {
/*
* check correctness of groupReduce on custom type with key extractor and combine
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
- DataSet<CustomType> reduceDs = ds.
- groupBy(new KeySelector<CustomType, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(CustomType in) {
- return in.myInt;
- }
- }).reduceGroup(new CustomTypeGroupReduceWithCombine());
-
- reduceDs.writeAsText(resultPath);
- env.execute();
-
- // return expected result
- return "1,0,test1\n" +
- "2,3,test2\n" +
- "3,12,test3\n" +
- "4,30,test4\n" +
- "5,60,test5\n" +
- "6,105,test6\n";
- }
- case 11: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<CustomType> reduceDs = ds.
+ groupBy(new KeySelector<CustomType, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ }).reduceGroup(new CustomTypeGroupReduceWithCombine());
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,0,test1\n" +
+ "2,3,test2\n" +
+ "3,12,test3\n" +
+ "4,30,test4\n" +
+ "5,60,test5\n" +
+ "6,105,test6\n";
+ }
+ case 11: {
/*
* check correctness of groupReduce on tuples with combine
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<Integer, String>> reduceDs = ds.
- groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,test1\n" +
- "5,test2\n" +
- "15,test3\n" +
- "34,test4\n" +
- "65,test5\n" +
- "111,test6\n";
- }
- // all-groupreduce with combine
- case 12: {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<Integer, String>> reduceDs = ds.
+ groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,test1\n" +
+ "5,test2\n" +
+ "15,test3\n" +
+ "34,test4\n" +
+ "65,test5\n" +
+ "111,test6\n";
+ }
+ // all-groupreduce with combine
+ case 12: {
/*
* check correctness of all-groupreduce for tuples with combine
*/
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env)
- .map(new IdentityMapper<Tuple3<Integer,Long,String>>()).setParallelism(4);
-
- Configuration cfg = new Configuration();
- cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
- DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
- .withParameters(cfg);
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
- }
- // descending sort not working
- case 13: {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env)
+ .map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
+
+ Configuration cfg = new Configuration();
+ cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
+ DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
+ .withParameters(cfg);
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
+ }
+ // descending sort not working
+ case 13: {
/*
* check correctness of groupReduce on tuples with key field selector and group sorting
*/
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
- groupBy(1).sortGroup(2,Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
-
- reduceDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "1,1,Hi\n" +
- "5,2,Hello world-Hello\n" +
- "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
- "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
- "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
- "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
-
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+ groupBy(1).sortGroup(2, Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,Hi\n" +
+ "5,2,Hello world-Hello\n" +
+ "15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
+ "34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
+ "65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
+ "111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+ }
+ default: {
+ throw new IllegalArgumentException("Invalid program id");
+ }
}
}
}
- public static class Tuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+ public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
@@ -436,7 +441,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static class Tuple3SortedGroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ public static class Tuple3SortedGroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@@ -462,7 +467,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static class Tuple5GroupReduce extends GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+ public static class Tuple5GroupReduce implements GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
@Override
@@ -486,7 +491,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static class CustomTypeGroupReduce extends GroupReduceFunction<CustomType, CustomType> {
+ public static class CustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@@ -511,8 +516,9 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
-
- public static class InputReturningTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+
+ public static class InputReturningTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -534,7 +540,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static class AllAddingTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+ public static class AllAddingTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -554,7 +560,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static class AllAddingCustomTypeGroupReduce extends GroupReduceFunction<CustomType, CustomType> {
+ public static class AllAddingCustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@Override
@@ -579,7 +585,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static class BCTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> {
+ public static class BCTuple3GroupReduce extends RichGroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private String f2Replace = "";
@@ -613,8 +619,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
- public static class Tuple3GroupReduceWithCombine extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
+ @RichGroupReduceFunction.Combinable
+ public static class Tuple3GroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -650,8 +656,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
- public static class Tuple3AllGroupReduceWithCombine extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
+ @RichGroupReduceFunction.Combinable
+ public static class Tuple3AllGroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -686,8 +692,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
- public static class CustomTypeGroupReduceWithCombine extends GroupReduceFunction<CustomType, CustomType> {
+ @RichGroupReduceFunction.Combinable
+ public static class CustomTypeGroupReduceWithCombine extends RichGroupReduceFunction<CustomType, CustomType> {
private static final long serialVersionUID = 1L;
@Override
@@ -723,7 +729,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
}
}
- public static final class IdentityMapper<T> extends MapFunction<T, T> {
+ public static final class IdentityMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) { return value; }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index ffad949..a293cbf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -33,6 +35,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -101,7 +104,7 @@ public class JoinITCase extends JavaProgramTestBase {
ds1.join(ds2)
.where(1)
.equalTo(1)
- .with(new T3T5Join());
+ .with(new T3T5FlatJoin());
joinDs.writeAsCsv(resultPath);
env.execute();
@@ -126,7 +129,7 @@ public class JoinITCase extends JavaProgramTestBase {
ds1.join(ds2)
.where(0,1)
.equalTo(0,4)
- .with(new T3T5Join());
+ .with(new T3T5FlatJoin());
joinDs.writeAsCsv(resultPath);
env.execute();
@@ -177,7 +180,7 @@ public class JoinITCase extends JavaProgramTestBase {
DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2)
.where(1)
.equalTo(1)
- .with(new T3T5Join());
+ .with(new T3T5FlatJoin());
joinDs.writeAsCsv(resultPath);
env.execute();
@@ -202,7 +205,7 @@ public class JoinITCase extends JavaProgramTestBase {
ds1.joinWithTiny(ds2)
.where(1)
.equalTo(1)
- .with(new T3T5Join());
+ .with(new T3T5FlatJoin());
joinDs.writeAsCsv(resultPath);
env.execute();
@@ -292,35 +295,35 @@ public class JoinITCase extends JavaProgramTestBase {
}
case 9: {
- /*
- * Join on a tuple input with key field selector and a custom type input with key extractor
- */
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ /*
+ * Join on a tuple input with key field selector and a custom type input with key extractor
+ */
- DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
- DataSet<Tuple2<String, String>> joinDs =
- ds1.join(ds2)
- .where(new KeySelector<CustomType, Integer>() {
- @Override
- public Integer getKey(CustomType value) {
- return value.myInt;
- }
- }
- )
- .equalTo(0)
- .with(new CustT3Join());
-
- joinDs.writeAsCsv(resultPath);
- env.execute();
-
- // return expected result
- return "Hi,Hi\n" +
- "Hello,Hello\n" +
- "Hello world,Hello\n";
-
- }
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(new KeySelector<CustomType, Integer>() {
+ @Override
+ public Integer getKey(CustomType value) {
+ return value.myInt;
+ }
+ }
+ )
+ .equalTo(0)
+ .with(new CustT3Join());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "Hi,Hi\n" +
+ "Hello,Hello\n" +
+ "Hello world,Hello\n";
+
+ }
case 10: {
/*
@@ -458,38 +461,39 @@ public class JoinITCase extends JavaProgramTestBase {
}
- public static class T3T5Join extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
+ public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
@Override
- public Tuple2<String, String> join(Tuple3<Integer, Long, String> first,
- Tuple5<Integer, Long, Integer, String, Long> second) {
-
- return new Tuple2<String,String>(first.f2, second.f3);
+ public void join(Tuple3<Integer, Long, String> first,
+ Tuple5<Integer, Long, Integer, String, Long> second,
+ Collector<Tuple2<String,String>> out) {
+
+ out.collect (new Tuple2<String,String> (first.f2, second.f3));
}
-
+
}
- public static class LeftReturningJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
+ public static class LeftReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
@Override
public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, String> first,
- Tuple5<Integer, Long, Integer, String, Long> second) {
+ Tuple5<Integer, Long, Integer, String, Long> second) {
return first;
}
}
- public static class RightReturningJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+ public static class RightReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
@Override
public Tuple5<Integer, Long, Integer, String, Long> join(Tuple3<Integer, Long, String> first,
- Tuple5<Integer, Long, Integer, String, Long> second) {
+ Tuple5<Integer, Long, Integer, String, Long> second) {
return second;
}
}
- public static class T3T5BCJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
+ public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
private int broadcast;
@@ -505,6 +509,7 @@ public class JoinITCase extends JavaProgramTestBase {
}
+ /*
@Override
public Tuple3<String, String, Integer> join(
Tuple3<Integer, Long, String> first,
@@ -512,19 +517,25 @@ public class JoinITCase extends JavaProgramTestBase {
return new Tuple3<String, String, Integer>(first.f2, second.f3, broadcast);
}
+ */
+
+ @Override
+ public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ out.collect(new Tuple3<String, String, Integer> (first.f2, second.f3, broadcast));
+ }
}
- public static class T3CustJoin extends JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> {
+ public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> {
@Override
public Tuple2<String, String> join(Tuple3<Integer, Long, String> first,
- CustomType second) {
+ CustomType second) {
return new Tuple2<String, String>(first.f2, second.myString);
}
}
- public static class CustT3Join extends JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
+ public static class CustT3Join implements JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
@Override
public Tuple2<String, String> join(CustomType first, Tuple3<Integer, Long, String> second) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index 0921e82..4f1fb1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -25,7 +25,8 @@ import java.util.LinkedList;
import org.junit.Assert;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -396,7 +397,7 @@ public class MapITCase extends JavaProgramTestBase {
DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
- map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+ map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
private Integer f2Replace = 0;
@@ -457,7 +458,7 @@ public class MapITCase extends JavaProgramTestBase {
final int testValue = 666;
conf.setInteger(testKey, testValue);
DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
- map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+ map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 6cc1061..a296a09 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -23,8 +23,9 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
@@ -270,7 +271,7 @@ public class ReduceITCase extends JavaProgramTestBase {
"65,5,Hi again!\n" +
"111,6,Hi again!\n";
}
- default:
+ default:
throw new IllegalArgumentException("Invalid program id");
}
@@ -278,7 +279,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
- public static class Tuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+ public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
private final String f2Replace;
@@ -306,7 +307,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
}
- public static class Tuple5Reduce extends ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
+ public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
private static final long serialVersionUID = 1L;
private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
@@ -321,7 +322,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
}
- public static class CustomTypeReduce extends ReduceFunction<CustomType> {
+ public static class CustomTypeReduce implements ReduceFunction<CustomType> {
private static final long serialVersionUID = 1L;
private final CustomType out = new CustomType();
@@ -336,7 +337,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
}
- public static class InputReturningTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+ public static class InputReturningTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
@Override
@@ -350,7 +351,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
}
- public static class AllAddingTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+ public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
@@ -364,7 +365,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
}
- public static class AllAddingCustomTypeReduce extends ReduceFunction<CustomType> {
+ public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> {
private static final long serialVersionUID = 1L;
private final CustomType out = new CustomType();
@@ -379,7 +380,7 @@ public class ReduceITCase extends JavaProgramTestBase {
}
}
- public static class BCTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+ public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> {
private static final long serialVersionUID = 1L;
private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
private String f2Replace = "";
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 09191cc..a636ba4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -145,7 +145,7 @@ public class UnionITCase extends JavaProgramTestBase {
// Don't know how to make an empty result in an other way than filtering it
DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
- filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
+ filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index ed573be..aaad08c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
+import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -49,6 +50,7 @@ import java.util.LinkedList;
/**
*/
@RunWith(Parameterized.class)
+//@Ignore("Test needs to be adapted to new cross signature")
public class CrossITCase extends RecordAPITestBase {
private static final Log LOG = LogFactory.getLog(CrossITCase.class);
@@ -61,13 +63,30 @@ public class CrossITCase extends RecordAPITestBase {
super(testConfig);
}
- private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n";
+ //private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n";
- private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n";
+ //private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n";
- private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
- + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
- + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n";
+ //private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
+ // + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
+ // + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n";
+
+ //private static final String RESULT = "10 1\n10 1\n10 5\n10 5\n4 1\n4 1\n4 2\n4 2\n5 0\n5 0\n5 1\n," +
+ // "5 1\n5 2\n5 2\n5 4\n5 4\n6 -1\n6 -1\n6 0\n6 0\n6 1\n6 1\n6 3\n6 3\n6 3\n6 3\n6 6\n6 6\n7 -1\n" +
+ // "7 -1\n7 -2\n7 -2\n7 0\n7 0\n7 2\n7 2\n7 2\n7 2\n7 4\n7 4\n7 5\n7 5\n7 8\n7 8\n8 -1\n8 -1\n8 1\n" +
+ // "8 1\n8 1\n8 1\n8 3\n8 3\n8 4\n8 4\n8 7\n8 7\n9 0\n9 0\n9 2\n9 2\n9 3\n9 3\n9 6\n9 6\n";
+
+ //private static final String RESULT = "2 2\n4 4\n1 1\n3 3\n2 2\n4 4\n1 1\n3 3\n5 0\n5 1\n6 1\n 6 3\n" +
+ // "7 2\n7 5\n8 3\n8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n" +
+ // "6 6\n7 4\n7 8\n6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6\n5 0\n5 1\n6 1\n6 3\n7 2\n7 5\n 8 3\n" +
+ // "8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n6 6\n7 4\n7 8\n" +
+ // "6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6";
+
+
+ private static final String LEFT_IN = "1 1\n2 2\n3 3\n";
+ private static final String RIGHT_IN = "3 6\n4 4\n4 8\n";
+
+ private static final String RESULT = "6 6\n7 5\n7 8\n7 4\n8 3\n8 7\n8 4\n9 2\n9 6\n";
@Override
protected void preSubmit() throws Exception {
@@ -84,7 +103,7 @@ public class CrossITCase extends RecordAPITestBase {
private IntValue integer = new IntValue();
@Override
- public void cross(Record record1, Record record2, Collector<Record> out) {
+ public Record cross(Record record1, Record record2) throws Exception {
string = record1.getField(1, string);
int val1 = Integer.parseInt(string.toString());
string = record2.getField(1, string);
@@ -95,16 +114,14 @@ public class CrossITCase extends RecordAPITestBase {
int key2 = Integer.parseInt(string.toString());
LOG.debug("Processing { [" + key1 + "," + val1 + "] , [" + key2 + "," + val2 + "] }");
-
- if (val1 + val2 <= 6) {
- string.setValue((key1 + key2 + 2) + "");
- integer.setValue(val2 - val1 + 1);
-
- record1.setField(0, string);
- record1.setField(1, integer);
-
- out.collect(record1);
- }
+
+ string.setValue((key1 + key2 + 2) + "");
+ integer.setValue(val2 - val1 + 1);
+
+ record1.setField(0, string);
+ record1.setField(1, integer);
+
+ return record1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
index a545e05..0f58d18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
@@ -47,19 +47,19 @@ public class ComputeDistance extends CrossFunction implements Serializable {
* 3: distance
*/
@Override
- public void cross(Record dataPointRecord, Record clusterCenterRecord, Collector<Record> out) {
-
+ public Record cross(Record dataPointRecord, Record clusterCenterRecord) throws Exception {
+
CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class);
-
+
IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class);
CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class);
-
+
this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-
- // add cluster center id and distance to the data point record
+
+ // add cluster center id and distance to the data point record
dataPointRecord.setField(2, clusterCenterId);
dataPointRecord.setField(3, this.distance);
-
- out.collect(dataPointRecord);
+
+ return dataPointRecord;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
index 15640c0..441dc39 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
@@ -89,7 +89,7 @@ public class TPCHQuery3 implements Program, ProgramDescription {
/**
* Reads the filter literals from the configuration.
*
- * @see org.apache.flink.api.common.functions.Function#open(org.apache.flink.configuration.Configuration)
+ * @see org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)
*/
@Override
public void open(Configuration parameters) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 97367f7..7149cd3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
@@ -170,7 +170,7 @@ public class KMeansForTest implements Program {
// *************************************************************************
/** Converts a Tuple2<Double,Double> into a Point. */
- public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
+ public static final class TuplePointConverter extends RichMapFunction<Tuple2<Double, Double>, Point> {
@Override
public Point map(Tuple2<Double, Double> t) throws Exception {
@@ -179,7 +179,7 @@ public class KMeansForTest implements Program {
}
/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
- public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+ public static final class TupleCentroidConverter extends RichMapFunction<Tuple3<Integer, Double, Double>, Centroid> {
@Override
public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
@@ -188,7 +188,7 @@ public class KMeansForTest implements Program {
}
/** Determines the closest cluster center for a data point. */
- public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
+ public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
private Collection<Centroid> centroids;
/** Reads the centroid values from a broadcast variable into a collection. */
@@ -236,7 +236,7 @@ public class KMeansForTest implements Program {
}
/** Appends a count variable to the tuple. */
- public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
+ public static final class CountAppender extends RichMapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
@Override
public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) {
@@ -245,7 +245,7 @@ public class KMeansForTest implements Program {
}
/** Sums and counts point coordinates. */
- public static final class CentroidAccumulator extends ReduceFunction<DummyTuple3IntPointLong> {
+ public static final class CentroidAccumulator extends RichReduceFunction<DummyTuple3IntPointLong> {
@Override
public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong val2) {
@@ -254,7 +254,7 @@ public class KMeansForTest implements Program {
}
/** Computes new centroid from coordinate sum and count of points. */
- public static final class CentroidAverager extends MapFunction<DummyTuple3IntPointLong, Centroid> {
+ public static final class CentroidAverager extends RichMapFunction<DummyTuple3IntPointLong, Centroid> {
@Override
public Centroid map(DummyTuple3IntPointLong value) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e0f153..8058539 100644
--- a/pom.xml
+++ b/pom.xml
@@ -274,6 +274,9 @@ under the License.
<activeByDefault>false</activeByDefault>
<jdk>1.8</jdk>
</activation>
+ <modules>
+ <module>flink-java8-tests</module>
+ </modules>
<build>
<plugins>
<plugin>