You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:56 UTC
[14/60] git commit: Move RichFunctions to api.common package
Move RichFunctions to api.common package
They were in api.java before but they can be used from Scala,
just like the regular functions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/568dff12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/568dff12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/568dff12
Branch: refs/heads/master
Commit: 568dff123d7ede05185be60842530076e234b3fa
Parents: a32890a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 3 14:50:42 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:57 2014 +0200
----------------------------------------------------------------------
.../flink/api/avro/AvroOutputFormatTest.java | 2 +-
.../avro/testjar/AvroExternalJarProgram.java | 4 +-
.../mapred/example/WordCount.java | 4 +-
.../mapreduce/example/WordCount.java | 4 +-
.../spargel/java/VertexCentricIteration.java | 2 +-
.../SpargelPageRankCountingVertices.java | 2 +-
.../SpargelConnectedComponentsITCase.java | 2 +-
.../connectors/json/JSONParseFlatMap.java | 2 +-
.../api/datastream/BatchedDataStream.java | 4 +-
.../api/datastream/ConnectedDataStream.java | 2 +-
.../streaming/api/datastream/DataStream.java | 8 +-
.../api/datastream/GroupedDataStream.java | 2 +-
.../apache/flink/streaming/api/IterateTest.java | 2 +-
.../operator/GroupedBatchGroupReduceTest.java | 2 +-
.../streamcomponent/StreamComponentTest.java | 2 +-
.../serialization/TypeSerializationTest.java | 2 +-
.../ml/IncrementalLearningSkeleton.java | 3 +
.../apache/flink/client/testjar/WordCount.java | 2 +-
.../compiler/BranchingPlansCompilerTest.java | 2 +-
.../CachedMatchStrategyCompilerTest.java | 2 +-
.../compiler/CoGroupSolutionSetFirstTest.java | 4 +-
.../compiler/GroupReduceCompilationTest.java | 2 +-
.../flink/compiler/IterationsCompilerTest.java | 8 +-
.../flink/compiler/ReduceCompilationTest.java | 2 +-
.../compiler/UnionPropertyPropagationTest.java | 2 +-
.../WorksetIterationsJavaApiCompilerTest.java | 6 +-
.../testfunctions/DummyFlatJoinFunction.java | 2 +-
.../testfunctions/IdentityGroupReducer.java | 4 +-
.../compiler/testfunctions/IdentityMapper.java | 2 +-
.../testfunctions/SelectOneReducer.java | 2 +-
.../testfunctions/Top1GroupReducer.java | 4 +-
.../common/functions/RichCoGroupFunction.java | 43 +++++++++
.../api/common/functions/RichCrossFunction.java | 42 +++++++++
.../common/functions/RichFilterFunction.java | 39 +++++++++
.../functions/RichFlatCombineFunction.java | 41 +++++++++
.../common/functions/RichFlatJoinFunction.java | 42 +++++++++
.../common/functions/RichFlatMapFunction.java | 41 +++++++++
.../functions/RichGroupReduceFunction.java | 87 +++++++++++++++++++
.../api/common/functions/RichJoinFunction.java | 41 +++++++++
.../api/common/functions/RichMapFunction.java | 40 +++++++++
.../functions/RichMapPartitionFunction.java | 41 +++++++++
.../common/functions/RichReduceFunction.java | 38 ++++++++
.../flink/example/java/clustering/KMeans.java | 2 +-
.../flink/example/java/ml/LinearRegression.java | 2 +-
.../relational/EmptyFieldsCountAccumulator.java | 2 +-
.../java/org/apache/flink/api/java/DataSet.java | 22 ++---
.../api/java/functions/FlatMapIterator.java | 3 +-
.../api/java/functions/FunctionAnnotation.java | 16 ++--
.../api/java/functions/GroupReduceIterator.java | 1 +
.../api/java/functions/RichCoGroupFunction.java | 43 ---------
.../api/java/functions/RichCrossFunction.java | 42 ---------
.../api/java/functions/RichFilterFunction.java | 39 ---------
.../java/functions/RichFlatCombineFunction.java | 41 ---------
.../java/functions/RichFlatJoinFunction.java | 42 ---------
.../api/java/functions/RichFlatMapFunction.java | 41 ---------
.../java/functions/RichGroupReduceFunction.java | 91 --------------------
.../api/java/functions/RichJoinFunction.java | 41 ---------
.../api/java/functions/RichMapFunction.java | 40 ---------
.../functions/RichMapPartitionFunction.java | 41 ---------
.../api/java/functions/RichReduceFunction.java | 38 --------
.../api/java/operators/AggregateOperator.java | 4 +-
.../api/java/operators/CoGroupOperator.java | 6 +-
.../api/java/operators/DistinctOperator.java | 4 +-
.../api/java/operators/GroupReduceOperator.java | 2 +-
.../flink/api/java/operators/JoinOperator.java | 8 +-
.../java/operators/SingleInputUdfOperator.java | 4 +-
.../api/java/operators/SortedGrouping.java | 4 +-
.../api/java/operators/TwoInputUdfOperator.java | 4 +-
.../flink/api/java/operators/UdfOperator.java | 4 +-
.../api/java/operators/UnsortedGrouping.java | 8 +-
.../translation/KeyExtractingMapper.java | 2 +-
.../translation/KeyRemovingMapper.java | 2 +-
.../PlanUnwrappingReduceGroupOperator.java | 2 +-
.../SemanticPropertiesTranslationTest.java | 2 +
.../DeltaIterationTranslationTest.java | 6 +-
.../translation/ReduceTranslationTests.java | 2 +-
.../java/type/extractor/TypeExtractorTest.java | 12 +--
.../lambdas/LambdaExtractionTest.java | 2 +-
.../runtime/operators/CachedMatchTaskTest.java | 5 +-
.../operators/CoGroupTaskExternalITCase.java | 4 +-
.../runtime/operators/CoGroupTaskTest.java | 4 +-
.../operators/CombineTaskExternalITCase.java | 2 +-
.../runtime/operators/CombineTaskTest.java | 4 +-
.../operators/ReduceTaskExternalITCase.java | 4 +-
.../flink/runtime/operators/ReduceTaskTest.java | 4 +-
.../operators/chaining/ChainTaskTest.java | 2 +-
.../drivers/AllGroupReduceDriverTest.java | 2 +-
.../operators/drivers/AllReduceDriverTest.java | 2 +-
.../drivers/GroupReduceDriverTest.java | 2 +-
.../drivers/ReduceCombineDriverTest.java | 2 +-
.../operators/drivers/ReduceDriverTest.java | 2 +-
.../CombiningUnilateralSortMergerITCase.java | 2 +-
...ultipleJoinsWithSolutionSetCompilerTest.java | 6 +-
.../BulkIterationWithAllReducerITCase.java | 2 +-
.../CoGroupConnectedComponentsSecondITCase.java | 6 +-
.../DependencyConnectedComponentsITCase.java | 6 +-
.../aggregators/AggregatorsITCase.java | 4 +-
...nentsWithParametrizableAggregatorITCase.java | 4 +-
...entsWithParametrizableConvergenceITCase.java | 6 +-
.../test/javaApiOperators/CoGroupITCase.java | 2 +-
.../test/javaApiOperators/CrossITCase.java | 2 +-
.../test/javaApiOperators/DistinctITCase.java | 2 +-
.../test/javaApiOperators/FilterITCase.java | 2 +-
.../test/javaApiOperators/FlatMapITCase.java | 2 +-
.../javaApiOperators/GroupReduceITCase.java | 4 +-
.../flink/test/javaApiOperators/JoinITCase.java | 2 +-
.../flink/test/javaApiOperators/MapITCase.java | 2 +-
.../test/javaApiOperators/ReduceITCase.java | 2 +-
.../test/javaApiOperators/UnionITCase.java | 2 +-
.../flink/test/util/testjar/KMeansForTest.java | 4 +-
110 files changed, 652 insertions(+), 662 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index 386f318..7ae8df0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -30,7 +30,7 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.api.java.record.io.avro.example.User;
import org.apache.flink.api.java.tuple.Tuple3;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 75b7da6..d0e52fd 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -40,8 +40,8 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
index ba09e77..623b58d 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
@@ -18,7 +18,7 @@
package org.apache.flink.hadoopcompatibility.mapred.example;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index c00a14a..33ab97c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -19,8 +19,8 @@
package org.apache.flink.hadoopcompatibility.mapreduce.example;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 777cf9d..37cc549 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.tuple.Tuple2;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 01d2cd7..94946ef 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -21,7 +21,7 @@ package org.apache.flink.spargel.java.examples;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
index 16b004c..948a708 100644
--- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.spargel;
import java.io.BufferedReader;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
index 96b1bf7..64f22ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -17,7 +17,7 @@
package org.apache.flink.streaming.connectors.json;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.sling.commons.json.JSONException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 0249a1f..2565ce1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 9529dcd..6108eec 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0e1ae57..970415b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichFilterFunction;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 8978b19..5261222 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 83f7e8e..fc14256 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api;
import static org.junit.Assert.assertTrue;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
index 096141d..a640893 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.MockInvokable;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index bfc8c7a..3e79401 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index c260077..cf89e2f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.junit.Test;
public class TypeSerializationTest {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 3218c47..d3cd2e6 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -134,4 +134,7 @@ public class IncrementalLearningSkeleton {
env.execute();
}
+
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 2b64b84..2447602 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -19,7 +19,7 @@
package org.apache.flink.client.testjar;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index fd2f422..63ed907 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.CrossOperator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
index 1deead3..5c27b99 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.compiler.dag.TempMode;
import org.apache.flink.compiler.plan.DualInputPlanNode;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 54af344..7c3fc27 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.compiler;
-import org.apache.flink.api.java.functions.RichCoGroupFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index 9f63683..0441a6f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.compiler;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 562336f..3d423ef 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.compiler;
import static org.junit.Assert.*;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -29,9 +29,9 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index fb8ae8d..5a69b3c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.compiler;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.Test;
import org.apache.flink.api.java.DataSet;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
index 1020c8b..f4e5614 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index a7c6152..3e73565 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -29,9 +29,9 @@ import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
index 2388db4..1cb3ac0 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.util.Collector;
public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index e06e3ba..4163e33 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -19,8 +19,8 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
index 29fc2c8..e797cf6 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
@@ -19,7 +19,7 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
public class IdentityMapper<T> extends RichMapFunction<T, T> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
index 7ce267f..71764dc 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -18,7 +18,7 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
public class SelectOneReducer<T> extends RichReduceFunction<T> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index b1a0e2d..f1b4e0b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -18,8 +18,8 @@
package org.apache.flink.compiler.testfunctions;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.util.Collector;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
new file mode 100644
index 0000000..b221cb7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link CoGroupFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
new file mode 100644
index 0000000..38ad961
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link CrossFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract OUT cross(IN1 first, IN2 second) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
new file mode 100644
index 0000000..b330866
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link FilterFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> The type of the filtered elements.
+ */
+public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract boolean filter(T value) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
new file mode 100644
index 0000000..5d6717d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> The data type of the elements to be combined.
+ */
+public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void combine(Iterable<T> values, Collector<T> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
new file mode 100644
index 0000000..e1ec0f1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link FlatJoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction implements FlatJoinFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
new file mode 100644
index 0000000..f3ec4a7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link FlatMapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
new file mode 100644
index 0000000..6c7edff
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link GroupReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the elements that this function processes.
+ * @param <OUT> The type of the elements returned by the user-defined function.
+ */
+public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
+
+ /**
+ * The combine methods pre-reduces elements. It may be called on subsets of the data
+ * before the actual reduce function. This is often helpful to lower data volume prior
+ * to reorganizing the data in an expensive way, as might be required for the final
+ * reduce function.
+ * <p>
+ * This method is only ever invoked when the subclass of {@link RichGroupReduceFunction}
+ * adds the {@link Combinable} annotation, or if the <i>combinable</i> flag is set when defining
+ * the <i>reduceGroup<i> operation via
+ * {@link org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean)}.
+ * <p>
+ * Since the reduce function will be called on the result of this method, it is important that this
+ * method returns the same data type as it consumes. By default, this method only calls the
+ * {@link #reduce(Iterable, Collector)} method. If the behavior in the pre-reducing is different
+ * from the final reduce function (for example because the reduce function changes the data type),
+ * this method must be overwritten, or the execution will fail.
+ *
+ * @param values The iterator returning the group of values to be reduced.
+ * @param out The collector to emit the returned values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ @Override
+ public void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
+ @SuppressWarnings("unchecked")
+ Collector<OUT> c = (Collector<OUT>) out;
+ reduce(values, c);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark
+ * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterable, Collector)}
+ * method on such functions, to pre-reduce the data before transferring it over the network to
+ * the actual group reduce operation.
+ * <p>
+ * Marking combinable functions as such is in general beneficial for performance.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.TYPE)
+ public static @interface Combinable {};
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
new file mode 100644
index 0000000..9139fc4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link JoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract OUT join(IN1 first, IN2 second) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
new file mode 100644
index 0000000..30b6666
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link MapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract OUT map(IN value) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
new file mode 100644
index 0000000..7fce2a8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link MapPartitionFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunction implements MapPartitionFunction<I, O> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public abstract void mapPartition(Iterable<I> values, Collector<O> out) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
new file mode 100644
index 0000000..c630510
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link ReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> Type of the elements that this function processes.
+ */
+public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public abstract T reduce(T value1, T value2) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
index 3c31af6..3bea458 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
index d65a809..6ef6270 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
@@ -24,7 +24,7 @@ import java.util.Collection;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.example.java.ml.util.LinearRegressionData;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
index 4bced17..a379bf8 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 3dcd780..4c111ff 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -132,13 +132,13 @@ public abstract class DataSet<T> {
/**
* Applies a Map transformation on a {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction} for each element of the DataSet.
+ * The transformation calls a {@link org.apache.flink.api.common.functions.RichMapFunction} for each element of the DataSet.
* Each MapFunction call returns exactly one element.
*
* @param mapper The MapFunction that is called for each element of the DataSet.
* @return A MapOperator that represents the transformed DataSet.
*
- * @see org.apache.flink.api.java.functions.RichMapFunction
+ * @see org.apache.flink.api.common.functions.RichMapFunction
* @see MapOperator
* @see DataSet
*/
@@ -185,13 +185,13 @@ public abstract class DataSet<T> {
/**
* Applies a FlatMap transformation on a {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of the DataSet.
+ * The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
* Each FlatMapFunction call can return any number of elements including none.
*
* @param flatMapper The FlatMapFunction that is called for each element of the DataSet.
* @return A FlatMapOperator that represents the transformed DataSet.
*
- * @see org.apache.flink.api.java.functions.RichFlatMapFunction
+ * @see org.apache.flink.api.common.functions.RichFlatMapFunction
* @see FlatMapOperator
* @see DataSet
*/
@@ -208,14 +208,14 @@ public abstract class DataSet<T> {
/**
* Applies a Filter transformation on a {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichFilterFunction} for each element of the DataSet
+ * The transformation calls a {@link org.apache.flink.api.common.functions.RichFilterFunction} for each element of the DataSet
* and retains only those element for which the function returns true. Elements for
* which the function returns false are filtered.
*
* @param filter The FilterFunction that is called for each element of the DataSet.
* @return A FilterOperator that represents the filtered DataSet.
*
- * @see org.apache.flink.api.java.functions.RichFilterFunction
+ * @see org.apache.flink.api.common.functions.RichFilterFunction
* @see FilterOperator
* @see DataSet
*/
@@ -311,14 +311,14 @@ public abstract class DataSet<T> {
/**
* Applies a Reduce transformation on a non-grouped {@link DataSet}.<br/>
- * The transformation consecutively calls a {@link org.apache.flink.api.java.functions.RichReduceFunction}
+ * The transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction}
* until only a single element remains which is the result of the transformation.
* A ReduceFunction combines two elements into one new element of the same type.
*
* @param reducer The ReduceFunction that is applied on the DataSet.
* @return A ReduceOperator that represents the reduced DataSet.
*
- * @see org.apache.flink.api.java.functions.RichReduceFunction
+ * @see org.apache.flink.api.common.functions.RichReduceFunction
* @see ReduceOperator
* @see DataSet
*/
@@ -331,14 +331,14 @@ public abstract class DataSet<T> {
/**
* Applies a GroupReduce transformation on a non-grouped {@link DataSet}.<br/>
- * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} once with the full DataSet.
+ * The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet.
* The GroupReduceFunction can iterate over all elements of the DataSet and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
- * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+ * @see org.apache.flink.api.common.functions.RichGroupReduceFunction
* @see org.apache.flink.api.java.operators.GroupReduceOperator
* @see DataSet
*/
@@ -600,7 +600,7 @@ public abstract class DataSet<T> {
* Initiates a CoGroup transformation.<br/>
* A CoGroup transformation combines the elements of
* two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and
- * gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.java.functions.RichCoGroupFunction}.
+ * gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.common.functions.RichCoGroupFunction}.
* If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction
* is called with an empty group for the non-existing group.</br>
* The CoGroupFunction can iterate over the elements of both groups and return any number
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 0a83235..2030de3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -20,10 +20,11 @@ package org.apache.flink.api.java.functions;
import java.util.Iterator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;
/**
- * A convenience variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then
+ * A convenience variant of the {@link org.apache.flink.api.common.functions.RichFlatMapFunction} that returns elements through an iterator, rather then
* through a collector. In all other respects, it behaves exactly like the FlatMapFunction.
* <p>
* The function needs to be serializable, as defined in {@link java.io.Serializable}.