You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/01 02:36:50 UTC
[09/16] git commit: [FLINK-701] Refactor Java API to use SAM
interfaces. Introduce RichFunction stubs for all UDFs.
[FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/22b24f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/22b24f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/22b24f20
Branch: refs/heads/master
Commit: 22b24f208de2501311fe504e767526690742e30e
Parents: 9d1c49f
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Fri Jul 18 15:37:19 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 31 17:47:35 2014 +0200
----------------------------------------------------------------------
.../flink/api/avro/AvroOutputFormatTest.java | 6 +-
.../avro/testjar/AvroExternalJarProgram.java | 8 +-
.../mapred/example/WordCount.java | 8 +-
.../mapreduce/example/WordCount.java | 8 +-
.../spargel/java/VertexCentricIteration.java | 8 +-
.../examples/SpargelConnectedComponents.java | 4 +-
.../spargel/java/examples/SpargelPageRank.java | 8 +-
.../SpargelPageRankCountingVertices.java | 14 +-
.../SpargelConnectedComponentsITCase.java | 4 +-
.../apache/flink/client/testjar/WordCount.java | 4 +-
.../flink/compiler/util/NoOpFunction.java | 4 +-
.../compiler/BranchingPlansCompilerTest.java | 5 +-
.../CachedMatchStrategyCompilerTest.java | 5 +-
.../compiler/CoGroupSolutionSetFirstTest.java | 9 +-
.../apache/flink/compiler/CompilerTestBase.java | 8 +-
.../compiler/FeedbackPropertiesMatchTest.java | 10 +-
.../compiler/GroupReduceCompilationTest.java | 20 +-
.../flink/compiler/IterationsCompilerTest.java | 20 +-
.../flink/compiler/ReduceCompilationTest.java | 10 +-
.../compiler/UnionPropertyPropagationTest.java | 4 +-
.../WorksetIterationsJavaApiCompilerTest.java | 18 +-
.../testfunctions/DummyFlatJoinFunction.java | 33 ++
.../testfunctions/DummyJoinFunction.java | 32 -
.../testfunctions/IdentityGroupReducer.java | 6 +-
.../compiler/testfunctions/IdentityMapper.java | 4 +-
.../testfunctions/SelectOneReducer.java | 4 +-
.../testfunctions/Top1GroupReducer.java | 6 +-
.../flink/compiler/util/DummyCrossStub.java | 9 +-
.../api/common/functions/AbstractFunction.java | 76 ---
.../common/functions/AbstractRichFunction.java | 76 +++
.../api/common/functions/CoGroupFunction.java | 41 ++
.../api/common/functions/CombineFunction.java | 31 +
.../api/common/functions/CrossFunction.java | 42 ++
.../api/common/functions/FilterFunction.java | 33 ++
.../common/functions/FlatCombineFunction.java | 33 ++
.../api/common/functions/FlatJoinFunction.java | 30 +
.../api/common/functions/FlatMapFunction.java | 45 ++
.../flink/api/common/functions/Function.java | 75 +--
.../api/common/functions/GenericCoGrouper.java | 40 --
.../common/functions/GenericCollectorMap.java | 2 +-
.../api/common/functions/GenericCombine.java | 32 -
.../api/common/functions/GenericCrosser.java | 41 --
.../api/common/functions/GenericFilter.java | 33 --
.../api/common/functions/GenericFlatMap.java | 42 --
.../common/functions/GenericGroupReduce.java | 44 --
.../api/common/functions/GenericJoiner.java | 28 -
.../flink/api/common/functions/GenericMap.java | 30 -
.../api/common/functions/GenericReduce.java | 26 -
.../common/functions/GroupReduceFunction.java | 45 ++
.../api/common/functions/JoinFunction.java | 28 +
.../flink/api/common/functions/MapFunction.java | 38 ++
.../api/common/functions/ReduceFunction.java | 39 ++
.../api/common/functions/RichFunction.java | 97 ++++
.../api/common/functions/RuntimeContext.java | 2 +-
.../common/functions/util/FunctionUtils.java | 81 +++
.../flink/api/common/operators/Union.java | 6 +-
.../operators/base/BulkIterationBase.java | 8 +-
.../operators/base/CoGroupOperatorBase.java | 6 +-
.../operators/base/CrossOperatorBase.java | 6 +-
.../operators/base/DeltaIterationBase.java | 6 +-
.../operators/base/FilterOperatorBase.java | 6 +-
.../operators/base/FlatMapOperatorBase.java | 6 +-
.../operators/base/GroupReduceOperatorBase.java | 17 +-
.../common/operators/base/JoinOperatorBase.java | 6 +-
.../common/operators/base/MapOperatorBase.java | 4 +-
.../operators/base/ReduceOperatorBase.java | 6 +-
.../api/common/operators/util/OperatorUtil.java | 18 +-
.../common/operators/util/OperatorUtilTest.java | 21 +-
flink-examples/flink-java-examples/pom.xml | 1 -
.../flink/example/java/clustering/KMeans.java | 17 +-
.../example/java/graph/ConnectedComponents.java | 24 +-
.../example/java/graph/EnumTrianglesBasic.java | 14 +-
.../example/java/graph/EnumTrianglesOpt.java | 28 +-
.../flink/example/java/graph/PageRankBasic.java | 18 +-
.../java/graph/TransitiveClosureNaive.java | 4 +-
.../flink/example/java/ml/LinearRegression.java | 15 +-
.../relational/EmptyFieldsCountAccumulator.java | 4 +-
.../java/relational/RelationalQuery.java | 2 +-
.../example/java/relational/TPCHQuery10.java | 4 +-
.../example/java/relational/TPCHQuery3.java | 4 +-
.../example/java/relational/WebLogAnalysis.java | 12 +-
.../flink/example/java/wordcount/WordCount.java | 4 +-
.../example/java/wordcount/WordCountPOJO.java | 6 +-
flink-examples/pom.xml | 1 -
.../java/org/apache/flink/api/java/DataSet.java | 93 +--
.../apache/flink/api/java/DeltaIteration.java | 2 +-
.../flink/api/java/ExecutionEnvironment.java | 6 +-
.../apache/flink/api/java/IterativeDataSet.java | 2 +-
.../api/java/functions/CoGroupFunction.java | 74 ---
.../flink/api/java/functions/CrossFunction.java | 76 ---
.../api/java/functions/FilterFunction.java | 57 --
.../api/java/functions/FlatMapFunction.java | 59 --
.../api/java/functions/FlatMapIterator.java | 4 +-
.../api/java/functions/FunctionAnnotation.java | 16 +-
.../api/java/functions/GroupReduceFunction.java | 114 ----
.../api/java/functions/GroupReduceIterator.java | 2 +-
.../flink/api/java/functions/JoinFunction.java | 93 ---
.../flink/api/java/functions/MapFunction.java | 59 --
.../api/java/functions/ReduceFunction.java | 63 --
.../api/java/functions/RichCoGroupFunction.java | 74 +++
.../api/java/functions/RichCrossFunction.java | 67 +++
.../api/java/functions/RichFilterFunction.java | 57 ++
.../java/functions/RichFlatCombineFunction.java | 34 ++
.../java/functions/RichFlatJoinFunction.java | 75 +++
.../api/java/functions/RichFlatMapFunction.java | 59 ++
.../java/functions/RichGroupReduceFunction.java | 114 ++++
.../api/java/functions/RichJoinFunction.java | 31 +
.../api/java/functions/RichMapFunction.java | 59 ++
.../api/java/functions/RichReduceFunction.java | 63 ++
.../UnsupportedLambdaExpressionException.java | 30 +
.../api/java/operators/AggregateOperator.java | 20 +-
.../api/java/operators/CoGroupOperator.java | 52 +-
.../flink/api/java/operators/CrossOperator.java | 18 +-
.../api/java/operators/DistinctOperator.java | 44 +-
.../api/java/operators/FilterOperator.java | 6 +-
.../api/java/operators/FlatMapOperator.java | 7 +-
.../api/java/operators/GroupReduceOperator.java | 224 +++++++
.../flink/api/java/operators/Grouping.java | 4 +-
.../flink/api/java/operators/JoinOperator.java | 163 ++++--
.../flink/api/java/operators/MapOperator.java | 10 +-
.../api/java/operators/ProjectOperator.java | 4 +-
.../api/java/operators/ReduceGroupOperator.java | 215 -------
.../api/java/operators/ReduceOperator.java | 19 +-
.../java/operators/SingleInputUdfOperator.java | 4 +-
.../api/java/operators/SortedGrouping.java | 20 +-
.../api/java/operators/TwoInputUdfOperator.java | 4 +-
.../flink/api/java/operators/UdfOperator.java | 10 +-
.../api/java/operators/UnsortedGrouping.java | 24 +-
.../translation/KeyExtractingMapper.java | 4 +-
.../translation/KeyRemovingMapper.java | 4 +-
.../translation/PlanFilterOperator.java | 8 +-
.../translation/PlanProjectOperator.java | 10 +-
.../PlanUnwrappingCoGroupOperator.java | 13 +-
.../translation/PlanUnwrappingJoinOperator.java | 28 +-
.../PlanUnwrappingReduceGroupOperator.java | 21 +-
.../PlanUnwrappingReduceOperator.java | 7 +-
.../translation/TupleKeyExtractingMapper.java | 4 +-
.../operators/translation/WrappingFunction.java | 22 +-
.../java/record/functions/CoGroupFunction.java | 5 +-
.../java/record/functions/CrossFunction.java | 19 +-
.../api/java/record/functions/JoinFunction.java | 8 +-
.../api/java/record/functions/MapFunction.java | 4 +-
.../java/record/functions/ReduceFunction.java | 8 +-
.../flink/api/java/typeutils/TypeExtractor.java | 66 ++-
.../java/typeutils/runtime/TupleComparator.java | 2 +-
.../SemanticPropertiesTranslationTest.java | 12 +-
.../DeltaIterationTranslationTest.java | 26 +-
.../translation/ReduceTranslationTests.java | 10 +-
.../java/type/extractor/TypeExtractorTest.java | 126 ++--
flink-java8-tests/pom.xml | 145 +++++
.../javaApiOperators/lambdas/CoGroupITCase.java | 70 +++
.../javaApiOperators/lambdas/CrossITCase.java | 59 ++
.../javaApiOperators/lambdas/FilterITCase.java | 142 +++++
.../lambdas/FlatJoinITCase.java | 58 ++
.../javaApiOperators/lambdas/FlatMapITCase.java | 46 ++
.../lambdas/GroupReduceITCase.java | 84 +++
.../javaApiOperators/lambdas/JoinITCase.java | 58 ++
.../javaApiOperators/lambdas/MapITCase.java | 48 ++
.../javaApiOperators/lambdas/ReduceITCase.java | 160 +++++
.../task/AbstractIterativePactTask.java | 4 +-
.../iterative/task/IterationHeadPactTask.java | 4 +-
.../task/IterationIntermediatePactTask.java | 4 +-
.../iterative/task/IterationTailPactTask.java | 4 +-
.../AbstractCachedBuildSideMatchDriver.java | 6 +-
.../runtime/operators/AllGroupReduceDriver.java | 24 +-
.../runtime/operators/AllReduceDriver.java | 16 +-
.../flink/runtime/operators/CoGroupDriver.java | 14 +-
.../CoGroupWithSolutionSetFirstDriver.java | 14 +-
.../CoGroupWithSolutionSetSecondDriver.java | 14 +-
.../flink/runtime/operators/CrossDriver.java | 32 +-
.../flink/runtime/operators/FlatMapDriver.java | 16 +-
.../operators/GroupReduceCombineDriver.java | 14 +-
.../runtime/operators/GroupReduceDriver.java | 16 +-
.../JoinWithSolutionSetFirstDriver.java | 14 +-
.../JoinWithSolutionSetSecondDriver.java | 14 +-
.../flink/runtime/operators/MapDriver.java | 16 +-
.../flink/runtime/operators/MatchDriver.java | 16 +-
.../flink/runtime/operators/NoOpDriver.java | 10 +-
.../runtime/operators/ReduceCombineDriver.java | 16 +-
.../flink/runtime/operators/ReduceDriver.java | 16 +-
.../runtime/operators/RegularPactTask.java | 42 +-
.../chaining/ChainedCollectorMapDriver.java | 4 +-
.../chaining/ChainedFlatMapDriver.java | 17 +-
.../operators/chaining/ChainedMapDriver.java | 14 +-
.../ChainedTerminationCriterionDriver.java | 4 +-
.../SynchronousChainedCombineDriver.java | 13 +-
.../hash/BuildFirstHashMatchIterator.java | 4 +-
.../hash/BuildSecondHashMatchIterator.java | 4 +-
.../sort/CombiningUnilateralSortMerger.java | 19 +-
.../operators/sort/MergeMatchIterator.java | 14 +-
.../operators/util/JoinTaskIterator.java | 4 +-
.../runtime/operators/CachedMatchTaskTest.java | 7 +-
.../operators/CoGroupTaskExternalITCase.java | 9 +-
.../runtime/operators/CoGroupTaskTest.java | 11 +-
.../operators/CombineTaskExternalITCase.java | 6 +-
.../runtime/operators/CombineTaskTest.java | 6 +-
.../operators/CrossTaskExternalITCase.java | 6 +-
.../flink/runtime/operators/CrossTaskTest.java | 22 +-
.../operators/MatchTaskExternalITCase.java | 6 +-
.../flink/runtime/operators/MatchTaskTest.java | 14 +-
.../operators/ReduceTaskExternalITCase.java | 6 +-
.../flink/runtime/operators/ReduceTaskTest.java | 6 +-
.../drivers/AllGroupReduceDriverTest.java | 20 +-
.../operators/drivers/AllReduceDriverTest.java | 32 +-
.../drivers/GroupReduceDriverTest.java | 20 +-
.../drivers/ReduceCombineDriverTest.java | 32 +-
.../operators/drivers/ReduceDriverTest.java | 32 +-
.../operators/hash/HashMatchIteratorITCase.java | 18 +-
.../sort/SortMergeMatchIteratorITCase.java | 4 +-
.../operators/testutils/DriverTestBase.java | 8 +-
.../operators/testutils/TaskTestBase.java | 6 +-
.../operators/util/HashVsSortMiniBenchmark.java | 4 +-
.../api/scala/functions/CrossFunction.scala | 28 +-
.../api/scala/operators/CrossOperator.scala | 69 +--
.../api/scala/operators/IterateOperators.scala | 2 +-
.../test/compiler/util/CompilerTestBase.java | 4 +-
.../test/compiler/util/OperatorResolver.java | 4 +-
flink-tests/pom.xml | 2 +-
...ultipleJoinsWithSolutionSetCompilerTest.java | 14 +-
.../BulkIterationWithAllReducerITCase.java | 4 +-
.../CoGroupConnectedComponentsSecondITCase.java | 12 +-
.../DependencyConnectedComponentsITCase.java | 23 +-
.../aggregators/AggregatorsITCase.java | 20 +-
...nentsWithParametrizableAggregatorITCase.java | 15 +-
...entsWithParametrizableConvergenceITCase.java | 15 +-
.../CustomCompensatableDotProductCoGroup.java | 6 +-
.../CustomCompensatableDotProductMatch.java | 8 +-
.../CustomCompensatingMap.java | 4 +-
.../CustomRankCombiner.java | 10 +-
.../test/javaApiOperators/CoGroupITCase.java | 17 +-
.../test/javaApiOperators/CrossITCase.java | 15 +-
.../test/javaApiOperators/DistinctITCase.java | 4 +-
.../test/javaApiOperators/FilterITCase.java | 9 +-
.../test/javaApiOperators/FlatMapITCase.java | 5 +-
.../javaApiOperators/GroupReduceITCase.java | 578 ++++++++++---------
.../flink/test/javaApiOperators/JoinITCase.java | 105 ++--
.../flink/test/javaApiOperators/MapITCase.java | 7 +-
.../test/javaApiOperators/ReduceITCase.java | 19 +-
.../test/javaApiOperators/UnionITCase.java | 4 +-
.../flink/test/operators/CrossITCase.java | 49 +-
.../recordJobs/kmeans/udfs/ComputeDistance.java | 16 +-
.../test/recordJobs/relational/TPCHQuery3.java | 2 +-
.../flink/test/util/testjar/KMeansForTest.java | 16 +-
pom.xml | 3 +
244 files changed, 4106 insertions(+), 2872 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index 637a5e9..386f318 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -30,7 +30,7 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.api.java.record.io.avro.example.User;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -125,7 +125,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase {
}
- public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> {
+ public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
@Override
public User map(Tuple3<String, Integer, String> value) throws Exception {
@@ -133,7 +133,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase {
}
}
- public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> {
+ public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
@Override
public ReflectiveUser map(User value) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 146c72b..75b7da6 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -40,8 +40,8 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -141,7 +141,7 @@ public class AvroExternalJarProgram {
// --------------------------------------------------------------------------------------------
- public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
+ public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
private static final long serialVersionUID = 1L;
@Override
@@ -151,7 +151,7 @@ public class AvroExternalJarProgram {
}
}
- public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
+ public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
index 4e8ffa9..ba09e77 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
@@ -18,6 +18,7 @@
package org.apache.flink.hadoopcompatibility.mapred.example;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -28,8 +29,7 @@ import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
@@ -88,7 +88,7 @@ public class WordCount {
/**
* Splits a line into words and converts Hadoop Writables into normal Java data types.
*/
- public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+ public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
@Override
public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
@@ -108,7 +108,7 @@ public class WordCount {
/**
* Converts Java data types to Hadoop Writables.
*/
- public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+ public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
@Override
public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 36ea378..c00a14a 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -19,8 +19,8 @@
package org.apache.flink.hadoopcompatibility.mapreduce.example;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
@@ -89,7 +89,7 @@ public class WordCount {
/**
* Splits a line into words and converts Hadoop Writables into normal Java data types.
*/
- public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+ public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
@Override
public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
@@ -109,7 +109,7 @@ public class WordCount {
/**
* Converts Java data types to Hadoop Writables.
*/
- public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+ public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
@Override
public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index bb84cea..65be2f8 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
-import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -393,7 +393,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
// --------------------------------------------------------------------------------------------
private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
- extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+ extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
{
private static final long serialVersionUID = 1L;
@@ -463,7 +463,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
* UDF that encapsulates the message sending function for graphs where the edges have no associated values.
*/
private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
- extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+ extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
{
private static final long serialVersionUID = 1L;
@@ -516,7 +516,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
* UDF that encapsulates the message sending function for graphs where the edges have an associated value.
*/
private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
- extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+ extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
{
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
index ea90feb..a4ba6fa 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -18,7 +18,7 @@
package org.apache.flink.spargel.java.examples;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.spargel.java.MessageIterator;
import org.apache.flink.spargel.java.MessagingFunction;
@@ -70,7 +70,7 @@ public class SpargelConnectedComponents {
* A map function that takes a Long value and creates a 2-tuple out of it:
* <pre>(Long value) -> (value, value)</pre>
*/
- public static final class IdAssigner extends MapFunction<Long, Tuple2<Long, Long>> {
+ public static final class IdAssigner extends RichMapFunction<Long, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> map(Long value) {
return new Tuple2<Long, Long>(value, value);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
index c7fbaaa..9dfc327 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -18,8 +18,8 @@
package org.apache.flink.spargel.java.examples;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.spargel.java.MessageIterator;
@@ -48,7 +48,7 @@ public class SpargelPageRank {
// enumerate some sample edges and assign an initial uniform probability (rank)
DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
- .map(new MapFunction<Long, Tuple2<Long, Double>>() {
+ .map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
public Tuple2<Long, Double> map(Long value) {
return new Tuple2<Long, Double>(value, 1.0/numVertices);
}
@@ -56,7 +56,7 @@ public class SpargelPageRank {
// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
- .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+ .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
int numOutEdges = (int) (Math.random() * (numVertices / 2));
for (int i = 0; i < numOutEdges; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 34c9ad8..43c0b84 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -18,9 +18,9 @@
package org.apache.flink.spargel.java.examples;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
@@ -53,7 +53,7 @@ public class SpargelPageRankCountingVertices {
// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
- .flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+ .flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
for (int i = 0; i < numOutEdges; i++) {
@@ -67,12 +67,12 @@ public class SpargelPageRankCountingVertices {
// count the number of vertices
DataSet<Long> count = vertices
- .map(new MapFunction<Long, Long>() {
+ .map(new RichMapFunction<Long, Long>() {
public Long map(Long value) {
return 1L;
}
})
- .reduce(new ReduceFunction<Long>() {
+ .reduce(new RichReduceFunction<Long>() {
public Long reduce(Long value1, Long value2) {
return value1 + value2;
}
@@ -80,7 +80,7 @@ public class SpargelPageRankCountingVertices {
// enumerate some sample edges and assign an initial uniform probability (rank)
DataSet<Tuple2<Long, Double>> intialRanks = vertices
- .map(new MapFunction<Long, Tuple2<Long, Double>>() {
+ .map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
private long numVertices;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
index a34f2db..16b004c 100644
--- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.spargel;
import java.io.BufferedReader;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -72,7 +72,7 @@ public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
}
}
- public static final class EdgeParser extends MapFunction<String, Tuple2<Long, Long>> {
+ public static final class EdgeParser extends RichMapFunction<String, Tuple2<Long, Long>> {
public Tuple2<Long, Long> map(String value) {
String[] nums = value.split(" ");
return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 7320b7b..2b64b84 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -19,7 +19,7 @@
package org.apache.flink.client.testjar;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@@ -75,7 +75,7 @@ public class WordCount {
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
- public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+ public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
index 9f10be5..6eccc8a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
@@ -18,9 +18,9 @@
package org.apache.flink.compiler.util;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
-public class NoOpFunction extends AbstractFunction {
+public class NoOpFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 571f4e4..10fa34f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.CrossOperator;
@@ -40,7 +40,6 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
@@ -959,7 +958,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
result1.join(result2)
.where(new IdentityKeyExtractor<String>())
.equalTo(new IdentityKeyExtractor<String>())
- .with(new JoinFunction<String, String, String>() {
+ .with(new RichJoinFunction<String, String, String>() {
@Override
public String join(String first, String second) {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
index 55c53f0..8226dbf 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.dag.TempMode;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -256,7 +255,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
}
- private static class DummyJoiner extends JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
+ private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
@Override
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 1c30545..3624d86 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -21,16 +21,15 @@ package org.apache.flink.compiler;
import java.util.Iterator;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -43,13 +42,13 @@ import org.apache.flink.util.Visitor;
@SuppressWarnings("serial")
public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
- public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+ public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
@Override
public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
}
}
- public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index ff4d6b0..b2c163b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -27,14 +27,12 @@ import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.compiler.DataStatistics;
-import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.costs.DefaultCostEstimator;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.PlanNode;
@@ -181,7 +179,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
}
@SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+ public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) {
List<PlanNode> nodes = this.map.get(name);
if (nodes == null || nodes.isEmpty()) {
throw new RuntimeException("No node found with the given name and stub class.");
@@ -243,7 +241,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
}
@SuppressWarnings("unchecked")
- public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
+ public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
List<Operator<?>> nodes = this.map.get(name);
if (nodes == null || nodes.isEmpty()) {
throw new RuntimeException("No node found with the given name and stub class.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index d50a7d6..0fbf072 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -22,8 +22,8 @@ package org.apache.flink.compiler;
import static org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
import static org.junit.Assert.*;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.Order;
@@ -47,7 +47,7 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plan.SourcePlanNode;
import org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.compiler.testfunctions.DummyJoinFunction;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -1426,10 +1426,10 @@ public class FeedbackPropertiesMatchTest {
}
private static final MapNode getMapNode() {
- return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
+ return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
}
private static final MatchNode getJoinNode() {
- return new MatchNode(new JoinOperatorBase<String, String, String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+ return new MatchNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index f3e513a..b9cc769 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -23,9 +23,9 @@ import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.ReduceGroupOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.util.Collector;
@@ -50,7 +50,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
- data.reduceGroup(new GroupReduceFunction<Double, Double>() {
+ data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
public void reduce(Iterator<Double> values, Collector<Double> out) {}
}).name("reducer")
.print().name("sink");
@@ -94,7 +94,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
- ReduceGroupOperator<Long, Long> reduced = data.reduceGroup(new GroupReduceFunction<Long, Long>() {
+ GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
public void reduce(Iterator<Long> values, Collector<Long> out) {}
}).name("reducer");
@@ -147,7 +147,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
data
.groupBy(1)
- .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
.print().name("sink");
@@ -194,9 +194,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
- ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+ GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(1)
- .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer");
@@ -255,7 +255,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.groupBy(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
- .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
.print().name("sink");
@@ -309,11 +309,11 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);
- ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+ GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
.groupBy(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
- .reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index d3c3e3f..8fc4324 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler;
import static org.junit.Assert.*;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.junit.Test;
import java.util.Iterator;
@@ -31,10 +32,9 @@ import org.apache.flink.api.java.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.IterativeDataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -261,7 +261,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
}
- public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -269,13 +269,13 @@ public class IterationsCompilerTest extends CompilerTestBase {
}
}
- public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+ public static final class FlatMapJoin extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
@Override
public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
}
- public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ public static final class DummyMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
@@ -284,14 +284,14 @@ public class IterationsCompilerTest extends CompilerTestBase {
}
@ConstantFields("0")
- public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+ public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
@Override
public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
}
@ConstantFields("0")
- public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+ public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
@Override
public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
}
}
- public static final class DuplicateValueScalar<T> extends MapFunction<T, Tuple2<T, T>> {
+ public static final class DuplicateValueScalar<T> extends RichMapFunction<T, Tuple2<T, T>> {
@Override
public Tuple2<T, T> map(T value) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index a654872..fb8ae8d 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.compiler;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.Test;
import org.apache.flink.api.java.DataSet;
@@ -46,7 +46,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
- data.reduce(new ReduceFunction<Double>() {
+ data.reduce(new RichReduceFunction<Double>() {
@Override
public Double reduce(Double value1, Double value2){
@@ -91,7 +91,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
- data.reduce(new ReduceFunction<Long>() {
+ data.reduce(new RichReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2){
@@ -145,7 +145,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
data
.groupBy(1)
- .reduce(new ReduceFunction<Tuple2<String,Double>>() {
+ .reduce(new RichReduceFunction<Tuple2<String,Double>>() {
@Override
public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
return null;
@@ -205,7 +205,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
.groupBy(new KeySelector<Tuple2<String,Double>, String>() {
public String getKey(Tuple2<String, Double> value) { return value.f0; }
})
- .reduce(new ReduceFunction<Tuple2<String,Double>>() {
+ .reduce(new RichReduceFunction<Tuple2<String,Double>>() {
@Override
public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
return null;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
index 1f653c0..1020c8b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
@@ -174,7 +174,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
});
}
- public static final class DummyFlatMap extends FlatMapFunction<String, Tuple2<String, Integer>> {
+ public static final class DummyFlatMap extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index e04256c..64a4791 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -214,7 +214,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
iter.getWorkset().join(invariantInput)
.where(1, 2)
.equalTo(1, 2)
- .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+ .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return first;
}
@@ -224,7 +224,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
result.join(iter.getSolutionSet())
.where(1, 0)
.equalTo(0, 2)
- .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+ .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return second;
}
@@ -263,7 +263,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
iter.getWorkset().join(invariantInput)
.where(1, 2)
.equalTo(1, 2)
- .with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+ .with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return first;
}
@@ -273,7 +273,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
.join(iter.getSolutionSet())
.where(1, 0)
.equalTo(1, 2)
- .with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+ .with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
return second;
}
@@ -282,7 +282,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
.withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
- .reduceGroup(new GroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
+ .reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
})
.name(NEXT_WORKSET_REDUCER_NAME)
@@ -290,7 +290,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
- joinedWithSolutionSet.map(new MapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
+ joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
joinedWithSolutionSet;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
new file mode 100644
index 0000000..2388db4
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
+import org.apache.flink.util.Collector;
+
+public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void join(T first, T second, Collector<T> out) {
+ out.collect(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
deleted file mode 100644
index 0db075f..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.compiler.testfunctions;
-
-import org.apache.flink.api.java.functions.JoinFunction;
-
-public class DummyJoinFunction<T> extends JoinFunction<T, T, T> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public T join(T first, T second) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index fe61f25..42275af 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions;
import java.util.Iterator;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
@Combinable
-public class IdentityGroupReducer<T> extends GroupReduceFunction<T, T> {
+public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
index b6aa40b..29fc2c8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
@@ -19,9 +19,9 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
-public class IdentityMapper<T> extends MapFunction<T, T> {
+public class IdentityMapper<T> extends RichMapFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
index 91634cc..7ce267f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -18,9 +18,9 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
-public class SelectOneReducer<T> extends ReduceFunction<T> {
+public class SelectOneReducer<T> extends RichReduceFunction<T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index 26db00e..3f24e65 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions;
import java.util.Iterator;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
@Combinable
-public class Top1GroupReducer<T> extends GroupReduceFunction<T, T> {
+public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
index 51ad75d..736ee14 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
@@ -23,14 +23,13 @@ import java.io.Serializable;
import org.apache.flink.api.java.record.functions.CrossFunction;
import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-public class DummyCrossStub extends CrossFunction implements Serializable {
+public class DummyCrossStub extends CrossFunction {
private static final long serialVersionUID = 1L;
+
@Override
- public void cross(Record record1, Record record2, Collector<Record> out) {
- out.collect(record1);
- out.collect(record2);
+ public Record cross(Record first, Record second) throws Exception {
+ return first;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
deleted file mode 100644
index f4b2763..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An abstract stub implementation for user-defined functions. It offers default implementations
- * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the
- * {@link RuntimeContext} and {@link IterationRuntimeContext}.
- */
-public abstract class AbstractFunction implements Function, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // --------------------------------------------------------------------------------------------
- // Runtime context access
- // --------------------------------------------------------------------------------------------
-
- private transient RuntimeContext runtimeContext;
-
- public void setRuntimeContext(RuntimeContext t) {
- if (this.runtimeContext == null) {
- this.runtimeContext = t;
- } else {
- throw new IllegalStateException("Error: The runtime context has already been set.");
- }
- }
-
- public RuntimeContext getRuntimeContext() {
- if (this.runtimeContext != null) {
- return this.runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
-
- public IterationRuntimeContext getIterationRuntimeContext() {
- if (this.runtimeContext == null) {
- throw new IllegalStateException("The runtime context has not been initialized.");
- } else if (this.runtimeContext instanceof IterationRuntimeContext) {
- return (IterationRuntimeContext) this.runtimeContext;
- } else {
- throw new IllegalStateException("This stub is not part of an iteration step function.");
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Default life cycle methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open(Configuration parameters) throws Exception {}
-
- @Override
- public void close() throws Exception {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
new file mode 100644
index 0000000..07b957d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An abstract stub implementation for user-defined functions. It offers default implementations
+ * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the
+ * {@link RuntimeContext} and {@link IterationRuntimeContext}.
+ */
+public abstract class AbstractRichFunction implements RichFunction, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime context access
+ // --------------------------------------------------------------------------------------------
+
+ private transient RuntimeContext runtimeContext;
+
+ public void setRuntimeContext(RuntimeContext t) {
+ if (this.runtimeContext == null) {
+ this.runtimeContext = t;
+ } else {
+ throw new IllegalStateException("Error: The runtime context has already been set.");
+ }
+ }
+
+ public RuntimeContext getRuntimeContext() {
+ if (this.runtimeContext != null) {
+ return this.runtimeContext;
+ } else {
+ throw new IllegalStateException("The runtime context has not been initialized.");
+ }
+ }
+
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ if (this.runtimeContext == null) {
+ throw new IllegalStateException("The runtime context has not been initialized.");
+ } else if (this.runtimeContext instanceof IterationRuntimeContext) {
+ return (IterationRuntimeContext) this.runtimeContext;
+ } else {
+ throw new IllegalStateException("This stub is not part of an iteration step function.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
new file mode 100644
index 0000000..5c200af
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+
+public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
+
+ /**
+ * This method must be implemented to provide a user implementation of a
+ * coGroup. It is called for each two key-value pairs that share the same
+ * key and come from different inputs.
+ *
+ * @param first The records from the first input which were paired with the key.
+ * @param second The records from the second input which were paired with the key.
+ * @param out A collector that collects all output pairs.
+ */
+ void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
new file mode 100644
index 0000000..d72c4c8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * Generic interface used for combiners.
+ */
+public interface CombineFunction<T> extends Function, Serializable {
+
+ T combine(Iterator<T> records) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
new file mode 100644
index 0000000..0c8bc97
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+
+/**
+ * @param <IN1> First input type
+ * @param <IN2> Second input type
+ * @param <OUT> Output type
+ */
+public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ /**
+ * User defined function for the cross operator.
+ *
+ * @param record1 Record from first input
+ * @param record2 Record from the second input
+ * @return result of cross UDF.
+ * @throws Exception
+ */
+ OUT cross(IN1 record1, IN2 record2) throws Exception;
+
+}