You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:56 UTC

[14/60] git commit: Move RichFunctions to api.common package

Move RichFunctions to api.common package

They were in api.java before but they can be used from Scala,
just like the regular functions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/568dff12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/568dff12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/568dff12

Branch: refs/heads/master
Commit: 568dff123d7ede05185be60842530076e234b3fa
Parents: a32890a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 3 14:50:42 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:57 2014 +0200

----------------------------------------------------------------------
 .../flink/api/avro/AvroOutputFormatTest.java    |  2 +-
 .../avro/testjar/AvroExternalJarProgram.java    |  4 +-
 .../mapred/example/WordCount.java               |  4 +-
 .../mapreduce/example/WordCount.java            |  4 +-
 .../spargel/java/VertexCentricIteration.java    |  2 +-
 .../SpargelPageRankCountingVertices.java        |  2 +-
 .../SpargelConnectedComponentsITCase.java       |  2 +-
 .../connectors/json/JSONParseFlatMap.java       |  2 +-
 .../api/datastream/BatchedDataStream.java       |  4 +-
 .../api/datastream/ConnectedDataStream.java     |  2 +-
 .../streaming/api/datastream/DataStream.java    |  8 +-
 .../api/datastream/GroupedDataStream.java       |  2 +-
 .../apache/flink/streaming/api/IterateTest.java |  2 +-
 .../operator/GroupedBatchGroupReduceTest.java   |  2 +-
 .../streamcomponent/StreamComponentTest.java    |  2 +-
 .../serialization/TypeSerializationTest.java    |  2 +-
 .../ml/IncrementalLearningSkeleton.java         |  3 +
 .../apache/flink/client/testjar/WordCount.java  |  2 +-
 .../compiler/BranchingPlansCompilerTest.java    |  2 +-
 .../CachedMatchStrategyCompilerTest.java        |  2 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |  4 +-
 .../compiler/GroupReduceCompilationTest.java    |  2 +-
 .../flink/compiler/IterationsCompilerTest.java  |  8 +-
 .../flink/compiler/ReduceCompilationTest.java   |  2 +-
 .../compiler/UnionPropertyPropagationTest.java  |  2 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |  6 +-
 .../testfunctions/DummyFlatJoinFunction.java    |  2 +-
 .../testfunctions/IdentityGroupReducer.java     |  4 +-
 .../compiler/testfunctions/IdentityMapper.java  |  2 +-
 .../testfunctions/SelectOneReducer.java         |  2 +-
 .../testfunctions/Top1GroupReducer.java         |  4 +-
 .../common/functions/RichCoGroupFunction.java   | 43 +++++++++
 .../api/common/functions/RichCrossFunction.java | 42 +++++++++
 .../common/functions/RichFilterFunction.java    | 39 +++++++++
 .../functions/RichFlatCombineFunction.java      | 41 +++++++++
 .../common/functions/RichFlatJoinFunction.java  | 42 +++++++++
 .../common/functions/RichFlatMapFunction.java   | 41 +++++++++
 .../functions/RichGroupReduceFunction.java      | 87 +++++++++++++++++++
 .../api/common/functions/RichJoinFunction.java  | 41 +++++++++
 .../api/common/functions/RichMapFunction.java   | 40 +++++++++
 .../functions/RichMapPartitionFunction.java     | 41 +++++++++
 .../common/functions/RichReduceFunction.java    | 38 ++++++++
 .../flink/example/java/clustering/KMeans.java   |  2 +-
 .../flink/example/java/ml/LinearRegression.java |  2 +-
 .../relational/EmptyFieldsCountAccumulator.java |  2 +-
 .../java/org/apache/flink/api/java/DataSet.java | 22 ++---
 .../api/java/functions/FlatMapIterator.java     |  3 +-
 .../api/java/functions/FunctionAnnotation.java  | 16 ++--
 .../api/java/functions/GroupReduceIterator.java |  1 +
 .../api/java/functions/RichCoGroupFunction.java | 43 ---------
 .../api/java/functions/RichCrossFunction.java   | 42 ---------
 .../api/java/functions/RichFilterFunction.java  | 39 ---------
 .../java/functions/RichFlatCombineFunction.java | 41 ---------
 .../java/functions/RichFlatJoinFunction.java    | 42 ---------
 .../api/java/functions/RichFlatMapFunction.java | 41 ---------
 .../java/functions/RichGroupReduceFunction.java | 91 --------------------
 .../api/java/functions/RichJoinFunction.java    | 41 ---------
 .../api/java/functions/RichMapFunction.java     | 40 ---------
 .../functions/RichMapPartitionFunction.java     | 41 ---------
 .../api/java/functions/RichReduceFunction.java  | 38 --------
 .../api/java/operators/AggregateOperator.java   |  4 +-
 .../api/java/operators/CoGroupOperator.java     |  6 +-
 .../api/java/operators/DistinctOperator.java    |  4 +-
 .../api/java/operators/GroupReduceOperator.java |  2 +-
 .../flink/api/java/operators/JoinOperator.java  |  8 +-
 .../java/operators/SingleInputUdfOperator.java  |  4 +-
 .../api/java/operators/SortedGrouping.java      |  4 +-
 .../api/java/operators/TwoInputUdfOperator.java |  4 +-
 .../flink/api/java/operators/UdfOperator.java   |  4 +-
 .../api/java/operators/UnsortedGrouping.java    |  8 +-
 .../translation/KeyExtractingMapper.java        |  2 +-
 .../translation/KeyRemovingMapper.java          |  2 +-
 .../PlanUnwrappingReduceGroupOperator.java      |  2 +-
 .../SemanticPropertiesTranslationTest.java      |  2 +
 .../DeltaIterationTranslationTest.java          |  6 +-
 .../translation/ReduceTranslationTests.java     |  2 +-
 .../java/type/extractor/TypeExtractorTest.java  | 12 +--
 .../lambdas/LambdaExtractionTest.java           |  2 +-
 .../runtime/operators/CachedMatchTaskTest.java  |  5 +-
 .../operators/CoGroupTaskExternalITCase.java    |  4 +-
 .../runtime/operators/CoGroupTaskTest.java      |  4 +-
 .../operators/CombineTaskExternalITCase.java    |  2 +-
 .../runtime/operators/CombineTaskTest.java      |  4 +-
 .../operators/ReduceTaskExternalITCase.java     |  4 +-
 .../flink/runtime/operators/ReduceTaskTest.java |  4 +-
 .../operators/chaining/ChainTaskTest.java       |  2 +-
 .../drivers/AllGroupReduceDriverTest.java       |  2 +-
 .../operators/drivers/AllReduceDriverTest.java  |  2 +-
 .../drivers/GroupReduceDriverTest.java          |  2 +-
 .../drivers/ReduceCombineDriverTest.java        |  2 +-
 .../operators/drivers/ReduceDriverTest.java     |  2 +-
 .../CombiningUnilateralSortMergerITCase.java    |  2 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |  6 +-
 .../BulkIterationWithAllReducerITCase.java      |  2 +-
 .../CoGroupConnectedComponentsSecondITCase.java |  6 +-
 .../DependencyConnectedComponentsITCase.java    |  6 +-
 .../aggregators/AggregatorsITCase.java          |  4 +-
 ...nentsWithParametrizableAggregatorITCase.java |  4 +-
 ...entsWithParametrizableConvergenceITCase.java |  6 +-
 .../test/javaApiOperators/CoGroupITCase.java    |  2 +-
 .../test/javaApiOperators/CrossITCase.java      |  2 +-
 .../test/javaApiOperators/DistinctITCase.java   |  2 +-
 .../test/javaApiOperators/FilterITCase.java     |  2 +-
 .../test/javaApiOperators/FlatMapITCase.java    |  2 +-
 .../javaApiOperators/GroupReduceITCase.java     |  4 +-
 .../flink/test/javaApiOperators/JoinITCase.java |  2 +-
 .../flink/test/javaApiOperators/MapITCase.java  |  2 +-
 .../test/javaApiOperators/ReduceITCase.java     |  2 +-
 .../test/javaApiOperators/UnionITCase.java      |  2 +-
 .../flink/test/util/testjar/KMeansForTest.java  |  4 +-
 110 files changed, 652 insertions(+), 662 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index 386f318..7ae8df0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -30,7 +30,7 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.io.AvroOutputFormat;
 import org.apache.flink.api.java.record.io.avro.example.User;
 import org.apache.flink.api.java.tuple.Tuple3;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 75b7da6..d0e52fd 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -40,8 +40,8 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
index ba09e77..623b58d 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.hadoopcompatibility.mapred.example;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index c00a14a..33ab97c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -19,8 +19,8 @@
 package org.apache.flink.hadoopcompatibility.mapreduce.example;
 
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 777cf9d..37cc549 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 01d2cd7..94946ef 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -21,7 +21,7 @@ package org.apache.flink.spargel.java.examples;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
index 16b004c..948a708 100644
--- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.spargel;
 
 import java.io.BufferedReader;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
index 96b1bf7..64f22ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.json;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.sling.commons.json.JSONException;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
index 0249a1f..2565ce1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 9529dcd..6108eec 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0e1ae57..970415b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichFilterFunction;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.JobGraphBuilder;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 8978b19..5261222 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 83f7e8e..fc14256 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api;
 
 import static org.junit.Assert.assertTrue;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
index 096141d..a640893 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.MockInvokable;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index bfc8c7a..3e79401 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index c260077..cf89e2f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.junit.Test;
 
 public class TypeSerializationTest {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 3218c47..d3cd2e6 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -134,4 +134,7 @@ public class IncrementalLearningSkeleton {
 
 		env.execute();
 	}
+
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 2b64b84..2447602 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -19,7 +19,7 @@
 package org.apache.flink.client.testjar;
 
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index fd2f422..63ed907 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.CoGroupOperator;
 import org.apache.flink.api.java.record.operators.CrossOperator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
index 1deead3..5c27b99 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.compiler.dag.TempMode;
 import org.apache.flink.compiler.plan.DualInputPlanNode;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 54af344..7c3fc27 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.compiler;
 
-import org.apache.flink.api.java.functions.RichCoGroupFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index 9f63683..0441a6f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.compiler;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 562336f..3d423ef 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.compiler;
 
 import static org.junit.Assert.*;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.junit.Test;
 
 import org.apache.flink.api.common.Plan;
@@ -29,9 +29,9 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index fb8ae8d..5a69b3c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.compiler;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
index 1020c8b..f4e5614 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index a7c6152..3e73565 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -29,9 +29,9 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichJoinFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
index 2388db4..1cb3ac0 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.util.Collector;
 
 public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index e06e3ba..4163e33 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -19,8 +19,8 @@
 package org.apache.flink.compiler.testfunctions;
 
 
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
index 29fc2c8..e797cf6 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 
 public class IdentityMapper<T> extends RichMapFunction<T, T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
index 7ce267f..71764dc 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 
 public class SelectOneReducer<T> extends RichReduceFunction<T> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index b1a0e2d..f1b4e0b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
new file mode 100644
index 0000000..b221cb7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCoGroupFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link CoGroupFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public abstract void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
new file mode 100644
index 0000000..38ad961
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichCrossFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link CrossFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichCrossFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract OUT cross(IN1 first, IN2 second) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
new file mode 100644
index 0000000..b330866
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFilterFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link FilterFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <T> The type of the filtered elements.
+ */
+public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
+	
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public abstract boolean filter(T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
new file mode 100644
index 0000000..5d6717d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatCombineFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> The data type of the elements to be combined.
+ */
+public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction implements FlatCombineFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract void combine(Iterable<T> values, Collector<T> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
new file mode 100644
index 0000000..e1ec0f1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatJoinFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link FlatJoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction implements FlatJoinFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
new file mode 100644
index 0000000..f3ec4a7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFlatMapFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link FlatMapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
new file mode 100644
index 0000000..6c7edff
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link GroupReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <IN> Type of the elements that this function processes.
+ * @param <OUT> The type of the elements returned by the user-defined function.
+ */
+public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, FlatCombineFunction<IN> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception;
+	
+	/**
+	 * The combine methods pre-reduces elements. It may be called on subsets of the data
+	 * before the actual reduce function. This is often helpful to lower data volume prior
+	 * to reorganizing the data in an expensive way, as might be required for the final
+	 * reduce function.
+	 * <p>
+	 * This method is only ever invoked when the subclass of {@link RichGroupReduceFunction}
+	 * adds the {@link Combinable} annotation, or if the <i>combinable</i> flag is set when defining
+	 * the <i>reduceGroup<i> operation via
+	 * {@link org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean)}.
+	 * <p>
+	 * Since the reduce function will be called on the result of this method, it is important that this
+	 * method returns the same data type as it consumes. By default, this method only calls the
+	 * {@link #reduce(Iterable, Collector)} method. If the behavior in the pre-reducing is different
+	 * from the final reduce function (for example because the reduce function changes the data type),
+	 * this method must be overwritten, or the execution will fail.
+	 * 
+	 * @param values The iterator returning the group of values to be reduced.
+	 * @param out The collector to emit the returned values.
+	 * 
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	@Override
+	public void combine(Iterable<IN> values, Collector<IN> out) throws Exception {
+		@SuppressWarnings("unchecked")
+		Collector<OUT> c = (Collector<OUT>) out;
+		reduce(values, c);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark
+	 * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterable, Collector)}
+	 * method on such functions, to pre-reduce the data before transferring it over the network to
+	 * the actual group reduce operation.
+	 * <p>
+	 * Marking combinable functions as such is in general beneficial for performance.
+	 */
+	@Retention(RetentionPolicy.RUNTIME)
+	@Target(ElementType.TYPE)
+	public static @interface Combinable {};
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
new file mode 100644
index 0000000..9139fc4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichJoinFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link JoinFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichJoinFunction<IN1,IN2,OUT> extends AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract OUT join(IN1 first, IN2 second) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
new file mode 100644
index 0000000..30b6666
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link MapFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public abstract OUT map(IN value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
new file mode 100644
index 0000000..7fce2a8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapPartitionFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Rich variant of the {@link MapPartitionFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+public abstract class RichMapPartitionFunction<I, O> extends AbstractRichFunction implements MapPartitionFunction<I, O> {
+
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public abstract void mapPartition(Iterable<I> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
new file mode 100644
index 0000000..c630510
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichReduceFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link ReduceFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ * 
+ * @param <T> Type of the elements that this function processes.
+ */
+public abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {
+	
+	private static final long serialVersionUID = 1L;
+
+	public abstract T reduce(T value1, T value2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
index 3c31af6..3bea458 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
index d65a809..6ef6270 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.example.java.ml.util.LinearRegressionData;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
index 4bced17..a379bf8 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.RichFilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 3dcd780..4c111ff 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -132,13 +132,13 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Applies a Map transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichMapFunction} for each element of the DataSet.
+	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichMapFunction} for each element of the DataSet.
 	 * Each MapFunction call returns exactly one element.
 	 * 
 	 * @param mapper The MapFunction that is called for each element of the DataSet.
 	 * @return A MapOperator that represents the transformed DataSet.
 	 * 
-	 * @see org.apache.flink.api.java.functions.RichMapFunction
+	 * @see org.apache.flink.api.common.functions.RichMapFunction
 	 * @see MapOperator
 	 * @see DataSet
 	 */
@@ -185,13 +185,13 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Applies a FlatMap transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of the DataSet.
+	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.
 	 * Each FlatMapFunction call can return any number of elements including none.
 	 * 
 	 * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. 
 	 * @return A FlatMapOperator that represents the transformed DataSet.
 	 * 
-	 * @see org.apache.flink.api.java.functions.RichFlatMapFunction
+	 * @see org.apache.flink.api.common.functions.RichFlatMapFunction
 	 * @see FlatMapOperator
 	 * @see DataSet
 	 */
@@ -208,14 +208,14 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Applies a Filter transformation on a {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichFilterFunction} for each element of the DataSet
+	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichFilterFunction} for each element of the DataSet
 	 * and retains only those element for which the function returns true. Elements for 
 	 * which the function returns false are filtered. 
 	 * 
 	 * @param filter The FilterFunction that is called for each element of the DataSet.
 	 * @return A FilterOperator that represents the filtered DataSet.
 	 * 
-	 * @see org.apache.flink.api.java.functions.RichFilterFunction
+	 * @see org.apache.flink.api.common.functions.RichFilterFunction
 	 * @see FilterOperator
 	 * @see DataSet
 	 */
@@ -311,14 +311,14 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Applies a Reduce transformation on a non-grouped {@link DataSet}.<br/>
-	 * The transformation consecutively calls a {@link org.apache.flink.api.java.functions.RichReduceFunction}
+	 * The transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction}
 	 *   until only a single element remains which is the result of the transformation.
 	 * A ReduceFunction combines two elements into one new element of the same type.
 	 * 
 	 * @param reducer The ReduceFunction that is applied on the DataSet.
 	 * @return A ReduceOperator that represents the reduced DataSet.
 	 * 
-	 * @see org.apache.flink.api.java.functions.RichReduceFunction
+	 * @see org.apache.flink.api.common.functions.RichReduceFunction
 	 * @see ReduceOperator
 	 * @see DataSet
 	 */
@@ -331,14 +331,14 @@ public abstract class DataSet<T> {
 	
 	/**
 	 * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.<br/>
-	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} once with the full DataSet.
+	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet.
 	 * The GroupReduceFunction can iterate over all elements of the DataSet and emit any
 	 *   number of output elements including none.
 	 * 
 	 * @param reducer The GroupReduceFunction that is applied on the DataSet.
 	 * @return A GroupReduceOperator that represents the reduced DataSet.
 	 * 
-	 * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+	 * @see org.apache.flink.api.common.functions.RichGroupReduceFunction
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
 	 * @see DataSet
 	 */
@@ -600,7 +600,7 @@ public abstract class DataSet<T> {
 	 * Initiates a CoGroup transformation.<br/>
 	 * A CoGroup transformation combines the elements of
 	 *   two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and 
-	 *   gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.java.functions.RichCoGroupFunction}.
+	 *   gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.common.functions.RichCoGroupFunction}.
 	 *   If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction
 	 *   is called with an empty group for the non-existing group.</br>
 	 * The CoGroupFunction can iterate over the elements of both groups and return any number 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/568dff12/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 0a83235..2030de3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -20,10 +20,11 @@ package org.apache.flink.api.java.functions;
 
 import java.util.Iterator;
 
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * A convenience variant of the {@link RichFlatMapFunction} that returns elements through an iterator, rather then
+ * A convenience variant of the {@link org.apache.flink.api.common.functions.RichFlatMapFunction} that returns elements through an iterator, rather then
  * through a collector. In all other respects, it behaves exactly like the FlatMapFunction.
  * <p>
  * The function needs to be serializable, as defined in {@link java.io.Serializable}.