You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:30 UTC

[14/22] git commit: [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.

[FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/22b24f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/22b24f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/22b24f20

Branch: refs/heads/release-0.6
Commit: 22b24f208de2501311fe504e767526690742e30e
Parents: 9d1c49f
Author: Kostas Tzoumas <Ko...@gmail.com>
Authored: Fri Jul 18 15:37:19 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 31 17:47:35 2014 +0200

----------------------------------------------------------------------
 .../flink/api/avro/AvroOutputFormatTest.java    |   6 +-
 .../avro/testjar/AvroExternalJarProgram.java    |   8 +-
 .../mapred/example/WordCount.java               |   8 +-
 .../mapreduce/example/WordCount.java            |   8 +-
 .../spargel/java/VertexCentricIteration.java    |   8 +-
 .../examples/SpargelConnectedComponents.java    |   4 +-
 .../spargel/java/examples/SpargelPageRank.java  |   8 +-
 .../SpargelPageRankCountingVertices.java        |  14 +-
 .../SpargelConnectedComponentsITCase.java       |   4 +-
 .../apache/flink/client/testjar/WordCount.java  |   4 +-
 .../flink/compiler/util/NoOpFunction.java       |   4 +-
 .../compiler/BranchingPlansCompilerTest.java    |   5 +-
 .../CachedMatchStrategyCompilerTest.java        |   5 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |   9 +-
 .../apache/flink/compiler/CompilerTestBase.java |   8 +-
 .../compiler/FeedbackPropertiesMatchTest.java   |  10 +-
 .../compiler/GroupReduceCompilationTest.java    |  20 +-
 .../flink/compiler/IterationsCompilerTest.java  |  20 +-
 .../flink/compiler/ReduceCompilationTest.java   |  10 +-
 .../compiler/UnionPropertyPropagationTest.java  |   4 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |  18 +-
 .../testfunctions/DummyFlatJoinFunction.java    |  33 ++
 .../testfunctions/DummyJoinFunction.java        |  32 -
 .../testfunctions/IdentityGroupReducer.java     |   6 +-
 .../compiler/testfunctions/IdentityMapper.java  |   4 +-
 .../testfunctions/SelectOneReducer.java         |   4 +-
 .../testfunctions/Top1GroupReducer.java         |   6 +-
 .../flink/compiler/util/DummyCrossStub.java     |   9 +-
 .../api/common/functions/AbstractFunction.java  |  76 ---
 .../common/functions/AbstractRichFunction.java  |  76 +++
 .../api/common/functions/CoGroupFunction.java   |  41 ++
 .../api/common/functions/CombineFunction.java   |  31 +
 .../api/common/functions/CrossFunction.java     |  42 ++
 .../api/common/functions/FilterFunction.java    |  33 ++
 .../common/functions/FlatCombineFunction.java   |  33 ++
 .../api/common/functions/FlatJoinFunction.java  |  30 +
 .../api/common/functions/FlatMapFunction.java   |  45 ++
 .../flink/api/common/functions/Function.java    |  75 +--
 .../api/common/functions/GenericCoGrouper.java  |  40 --
 .../common/functions/GenericCollectorMap.java   |   2 +-
 .../api/common/functions/GenericCombine.java    |  32 -
 .../api/common/functions/GenericCrosser.java    |  41 --
 .../api/common/functions/GenericFilter.java     |  33 --
 .../api/common/functions/GenericFlatMap.java    |  42 --
 .../common/functions/GenericGroupReduce.java    |  44 --
 .../api/common/functions/GenericJoiner.java     |  28 -
 .../flink/api/common/functions/GenericMap.java  |  30 -
 .../api/common/functions/GenericReduce.java     |  26 -
 .../common/functions/GroupReduceFunction.java   |  45 ++
 .../api/common/functions/JoinFunction.java      |  28 +
 .../flink/api/common/functions/MapFunction.java |  38 ++
 .../api/common/functions/ReduceFunction.java    |  39 ++
 .../api/common/functions/RichFunction.java      |  97 ++++
 .../api/common/functions/RuntimeContext.java    |   2 +-
 .../common/functions/util/FunctionUtils.java    |  81 +++
 .../flink/api/common/operators/Union.java       |   6 +-
 .../operators/base/BulkIterationBase.java       |   8 +-
 .../operators/base/CoGroupOperatorBase.java     |   6 +-
 .../operators/base/CrossOperatorBase.java       |   6 +-
 .../operators/base/DeltaIterationBase.java      |   6 +-
 .../operators/base/FilterOperatorBase.java      |   6 +-
 .../operators/base/FlatMapOperatorBase.java     |   6 +-
 .../operators/base/GroupReduceOperatorBase.java |  17 +-
 .../common/operators/base/JoinOperatorBase.java |   6 +-
 .../common/operators/base/MapOperatorBase.java  |   4 +-
 .../operators/base/ReduceOperatorBase.java      |   6 +-
 .../api/common/operators/util/OperatorUtil.java |  18 +-
 .../common/operators/util/OperatorUtilTest.java |  21 +-
 flink-examples/flink-java-examples/pom.xml      |   1 -
 .../flink/example/java/clustering/KMeans.java   |  17 +-
 .../example/java/graph/ConnectedComponents.java |  24 +-
 .../example/java/graph/EnumTrianglesBasic.java  |  14 +-
 .../example/java/graph/EnumTrianglesOpt.java    |  28 +-
 .../flink/example/java/graph/PageRankBasic.java |  18 +-
 .../java/graph/TransitiveClosureNaive.java      |   4 +-
 .../flink/example/java/ml/LinearRegression.java |  15 +-
 .../relational/EmptyFieldsCountAccumulator.java |   4 +-
 .../java/relational/RelationalQuery.java        |   2 +-
 .../example/java/relational/TPCHQuery10.java    |   4 +-
 .../example/java/relational/TPCHQuery3.java     |   4 +-
 .../example/java/relational/WebLogAnalysis.java |  12 +-
 .../flink/example/java/wordcount/WordCount.java |   4 +-
 .../example/java/wordcount/WordCountPOJO.java   |   6 +-
 flink-examples/pom.xml                          |   1 -
 .../java/org/apache/flink/api/java/DataSet.java |  93 +--
 .../apache/flink/api/java/DeltaIteration.java   |   2 +-
 .../flink/api/java/ExecutionEnvironment.java    |   6 +-
 .../apache/flink/api/java/IterativeDataSet.java |   2 +-
 .../api/java/functions/CoGroupFunction.java     |  74 ---
 .../flink/api/java/functions/CrossFunction.java |  76 ---
 .../api/java/functions/FilterFunction.java      |  57 --
 .../api/java/functions/FlatMapFunction.java     |  59 --
 .../api/java/functions/FlatMapIterator.java     |   4 +-
 .../api/java/functions/FunctionAnnotation.java  |  16 +-
 .../api/java/functions/GroupReduceFunction.java | 114 ----
 .../api/java/functions/GroupReduceIterator.java |   2 +-
 .../flink/api/java/functions/JoinFunction.java  |  93 ---
 .../flink/api/java/functions/MapFunction.java   |  59 --
 .../api/java/functions/ReduceFunction.java      |  63 --
 .../api/java/functions/RichCoGroupFunction.java |  74 +++
 .../api/java/functions/RichCrossFunction.java   |  67 +++
 .../api/java/functions/RichFilterFunction.java  |  57 ++
 .../java/functions/RichFlatCombineFunction.java |  34 ++
 .../java/functions/RichFlatJoinFunction.java    |  75 +++
 .../api/java/functions/RichFlatMapFunction.java |  59 ++
 .../java/functions/RichGroupReduceFunction.java | 114 ++++
 .../api/java/functions/RichJoinFunction.java    |  31 +
 .../api/java/functions/RichMapFunction.java     |  59 ++
 .../api/java/functions/RichReduceFunction.java  |  63 ++
 .../UnsupportedLambdaExpressionException.java   |  30 +
 .../api/java/operators/AggregateOperator.java   |  20 +-
 .../api/java/operators/CoGroupOperator.java     |  52 +-
 .../flink/api/java/operators/CrossOperator.java |  18 +-
 .../api/java/operators/DistinctOperator.java    |  44 +-
 .../api/java/operators/FilterOperator.java      |   6 +-
 .../api/java/operators/FlatMapOperator.java     |   7 +-
 .../api/java/operators/GroupReduceOperator.java | 224 +++++++
 .../flink/api/java/operators/Grouping.java      |   4 +-
 .../flink/api/java/operators/JoinOperator.java  | 163 ++++--
 .../flink/api/java/operators/MapOperator.java   |  10 +-
 .../api/java/operators/ProjectOperator.java     |   4 +-
 .../api/java/operators/ReduceGroupOperator.java | 215 -------
 .../api/java/operators/ReduceOperator.java      |  19 +-
 .../java/operators/SingleInputUdfOperator.java  |   4 +-
 .../api/java/operators/SortedGrouping.java      |  20 +-
 .../api/java/operators/TwoInputUdfOperator.java |   4 +-
 .../flink/api/java/operators/UdfOperator.java   |  10 +-
 .../api/java/operators/UnsortedGrouping.java    |  24 +-
 .../translation/KeyExtractingMapper.java        |   4 +-
 .../translation/KeyRemovingMapper.java          |   4 +-
 .../translation/PlanFilterOperator.java         |   8 +-
 .../translation/PlanProjectOperator.java        |  10 +-
 .../PlanUnwrappingCoGroupOperator.java          |  13 +-
 .../translation/PlanUnwrappingJoinOperator.java |  28 +-
 .../PlanUnwrappingReduceGroupOperator.java      |  21 +-
 .../PlanUnwrappingReduceOperator.java           |   7 +-
 .../translation/TupleKeyExtractingMapper.java   |   4 +-
 .../operators/translation/WrappingFunction.java |  22 +-
 .../java/record/functions/CoGroupFunction.java  |   5 +-
 .../java/record/functions/CrossFunction.java    |  19 +-
 .../api/java/record/functions/JoinFunction.java |   8 +-
 .../api/java/record/functions/MapFunction.java  |   4 +-
 .../java/record/functions/ReduceFunction.java   |   8 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  66 ++-
 .../java/typeutils/runtime/TupleComparator.java |   2 +-
 .../SemanticPropertiesTranslationTest.java      |  12 +-
 .../DeltaIterationTranslationTest.java          |  26 +-
 .../translation/ReduceTranslationTests.java     |  10 +-
 .../java/type/extractor/TypeExtractorTest.java  | 126 ++--
 flink-java8-tests/pom.xml                       | 145 +++++
 .../javaApiOperators/lambdas/CoGroupITCase.java |  70 +++
 .../javaApiOperators/lambdas/CrossITCase.java   |  59 ++
 .../javaApiOperators/lambdas/FilterITCase.java  | 142 +++++
 .../lambdas/FlatJoinITCase.java                 |  58 ++
 .../javaApiOperators/lambdas/FlatMapITCase.java |  46 ++
 .../lambdas/GroupReduceITCase.java              |  84 +++
 .../javaApiOperators/lambdas/JoinITCase.java    |  58 ++
 .../javaApiOperators/lambdas/MapITCase.java     |  48 ++
 .../javaApiOperators/lambdas/ReduceITCase.java  | 160 +++++
 .../task/AbstractIterativePactTask.java         |   4 +-
 .../iterative/task/IterationHeadPactTask.java   |   4 +-
 .../task/IterationIntermediatePactTask.java     |   4 +-
 .../iterative/task/IterationTailPactTask.java   |   4 +-
 .../AbstractCachedBuildSideMatchDriver.java     |   6 +-
 .../runtime/operators/AllGroupReduceDriver.java |  24 +-
 .../runtime/operators/AllReduceDriver.java      |  16 +-
 .../flink/runtime/operators/CoGroupDriver.java  |  14 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |  14 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |  14 +-
 .../flink/runtime/operators/CrossDriver.java    |  32 +-
 .../flink/runtime/operators/FlatMapDriver.java  |  16 +-
 .../operators/GroupReduceCombineDriver.java     |  14 +-
 .../runtime/operators/GroupReduceDriver.java    |  16 +-
 .../JoinWithSolutionSetFirstDriver.java         |  14 +-
 .../JoinWithSolutionSetSecondDriver.java        |  14 +-
 .../flink/runtime/operators/MapDriver.java      |  16 +-
 .../flink/runtime/operators/MatchDriver.java    |  16 +-
 .../flink/runtime/operators/NoOpDriver.java     |  10 +-
 .../runtime/operators/ReduceCombineDriver.java  |  16 +-
 .../flink/runtime/operators/ReduceDriver.java   |  16 +-
 .../runtime/operators/RegularPactTask.java      |  42 +-
 .../chaining/ChainedCollectorMapDriver.java     |   4 +-
 .../chaining/ChainedFlatMapDriver.java          |  17 +-
 .../operators/chaining/ChainedMapDriver.java    |  14 +-
 .../ChainedTerminationCriterionDriver.java      |   4 +-
 .../SynchronousChainedCombineDriver.java        |  13 +-
 .../hash/BuildFirstHashMatchIterator.java       |   4 +-
 .../hash/BuildSecondHashMatchIterator.java      |   4 +-
 .../sort/CombiningUnilateralSortMerger.java     |  19 +-
 .../operators/sort/MergeMatchIterator.java      |  14 +-
 .../operators/util/JoinTaskIterator.java        |   4 +-
 .../runtime/operators/CachedMatchTaskTest.java  |   7 +-
 .../operators/CoGroupTaskExternalITCase.java    |   9 +-
 .../runtime/operators/CoGroupTaskTest.java      |  11 +-
 .../operators/CombineTaskExternalITCase.java    |   6 +-
 .../runtime/operators/CombineTaskTest.java      |   6 +-
 .../operators/CrossTaskExternalITCase.java      |   6 +-
 .../flink/runtime/operators/CrossTaskTest.java  |  22 +-
 .../operators/MatchTaskExternalITCase.java      |   6 +-
 .../flink/runtime/operators/MatchTaskTest.java  |  14 +-
 .../operators/ReduceTaskExternalITCase.java     |   6 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   6 +-
 .../drivers/AllGroupReduceDriverTest.java       |  20 +-
 .../operators/drivers/AllReduceDriverTest.java  |  32 +-
 .../drivers/GroupReduceDriverTest.java          |  20 +-
 .../drivers/ReduceCombineDriverTest.java        |  32 +-
 .../operators/drivers/ReduceDriverTest.java     |  32 +-
 .../operators/hash/HashMatchIteratorITCase.java |  18 +-
 .../sort/SortMergeMatchIteratorITCase.java      |   4 +-
 .../operators/testutils/DriverTestBase.java     |   8 +-
 .../operators/testutils/TaskTestBase.java       |   6 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   4 +-
 .../api/scala/functions/CrossFunction.scala     |  28 +-
 .../api/scala/operators/CrossOperator.scala     |  69 +--
 .../api/scala/operators/IterateOperators.scala  |   2 +-
 .../test/compiler/util/CompilerTestBase.java    |   4 +-
 .../test/compiler/util/OperatorResolver.java    |   4 +-
 flink-tests/pom.xml                             |   2 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |  14 +-
 .../BulkIterationWithAllReducerITCase.java      |   4 +-
 .../CoGroupConnectedComponentsSecondITCase.java |  12 +-
 .../DependencyConnectedComponentsITCase.java    |  23 +-
 .../aggregators/AggregatorsITCase.java          |  20 +-
 ...nentsWithParametrizableAggregatorITCase.java |  15 +-
 ...entsWithParametrizableConvergenceITCase.java |  15 +-
 .../CustomCompensatableDotProductCoGroup.java   |   6 +-
 .../CustomCompensatableDotProductMatch.java     |   8 +-
 .../CustomCompensatingMap.java                  |   4 +-
 .../CustomRankCombiner.java                     |  10 +-
 .../test/javaApiOperators/CoGroupITCase.java    |  17 +-
 .../test/javaApiOperators/CrossITCase.java      |  15 +-
 .../test/javaApiOperators/DistinctITCase.java   |   4 +-
 .../test/javaApiOperators/FilterITCase.java     |   9 +-
 .../test/javaApiOperators/FlatMapITCase.java    |   5 +-
 .../javaApiOperators/GroupReduceITCase.java     | 578 ++++++++++---------
 .../flink/test/javaApiOperators/JoinITCase.java | 105 ++--
 .../flink/test/javaApiOperators/MapITCase.java  |   7 +-
 .../test/javaApiOperators/ReduceITCase.java     |  19 +-
 .../test/javaApiOperators/UnionITCase.java      |   4 +-
 .../flink/test/operators/CrossITCase.java       |  49 +-
 .../recordJobs/kmeans/udfs/ComputeDistance.java |  16 +-
 .../test/recordJobs/relational/TPCHQuery3.java  |   2 +-
 .../flink/test/util/testjar/KMeansForTest.java  |  16 +-
 pom.xml                                         |   3 +
 244 files changed, 4106 insertions(+), 2872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index 637a5e9..386f318 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -30,7 +30,7 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.io.AvroOutputFormat;
 import org.apache.flink.api.java.record.io.avro.example.User;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -125,7 +125,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase {
 	}
 
 
-	public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> {
+	public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
 
 		@Override
 		public User map(Tuple3<String, Integer, String> value) throws Exception {
@@ -133,7 +133,7 @@ public class AvroOutputFormatTest extends JavaProgramTestBase {
 		}
 	}
 
-	public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> {
+	public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
 
 		@Override
 		public ReflectiveUser map(User value) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 146c72b..75b7da6 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -40,8 +40,8 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -141,7 +141,7 @@ public class AvroExternalJarProgram  {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
+	public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -151,7 +151,7 @@ public class AvroExternalJarProgram  {
 		}
 	}
 	
-	public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
+	public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
index 4e8ffa9..ba09e77 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.hadoopcompatibility.mapred.example;
 
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -28,8 +29,7 @@ import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
@@ -88,7 +88,7 @@ public class WordCount {
 	/**
 	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
 	 */
-	public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+	public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
 		
 		@Override
 		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
@@ -108,7 +108,7 @@ public class WordCount {
 	/**
 	 * Converts Java data types to Hadoop Writables.
 	 */
-	public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+	public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
 		
 		@Override
 		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 36ea378..c00a14a 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -19,8 +19,8 @@
 package org.apache.flink.hadoopcompatibility.mapreduce.example;
 
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.Path;
@@ -89,7 +89,7 @@ public class WordCount {
 	/**
 	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
 	 */
-	public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+	public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
 		
 		@Override
 		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
@@ -109,7 +109,7 @@ public class WordCount {
 	/**
 	 * Converts Java data types to Hadoop Writables.
 	 */
-	public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+	public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
 		
 		@Override
 		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index bb84cea..65be2f8 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
-import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -393,7 +393,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 	// --------------------------------------------------------------------------------------------
 	
 	private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
 		implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
 	{
 		private static final long serialVersionUID = 1L;
@@ -463,7 +463,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 	 * UDF that encapsulates the message sending function for graphs where the edges have no associated values.
 	 */
 	private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message> 
-		extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+		extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
 		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
 	{
 		private static final long serialVersionUID = 1L;
@@ -516,7 +516,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
 	 */
 	private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-		extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+		extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
 		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
 	{
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
index ea90feb..a4ba6fa 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.spargel.java.MessageIterator;
 import org.apache.flink.spargel.java.MessagingFunction;
@@ -70,7 +70,7 @@ public class SpargelConnectedComponents {
 	 * A map function that takes a Long value and creates a 2-tuple out of it:
 	 * <pre>(Long value) -> (value, value)</pre>
 	 */
-	public static final class IdAssigner extends MapFunction<Long, Tuple2<Long, Long>> {
+	public static final class IdAssigner extends RichMapFunction<Long, Tuple2<Long, Long>> {
 		@Override
 		public Tuple2<Long, Long> map(Long value) {
 			return new Tuple2<Long, Long>(value, value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
index c7fbaaa..9dfc327 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.spargel.java.MessageIterator;
@@ -48,7 +48,7 @@ public class SpargelPageRank {
 		
 		// enumerate some sample edges and assign an initial uniform probability (rank)
 		DataSet<Tuple2<Long, Double>> intialRanks = env.generateSequence(1, numVertices)
-								.map(new MapFunction<Long, Tuple2<Long, Double>>() {
+								.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
 									public Tuple2<Long, Double> map(Long value) {
 										return new Tuple2<Long, Double>(value, 1.0/numVertices);
 									}
@@ -56,7 +56,7 @@ public class SpargelPageRank {
 		
 		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
 		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, numVertices)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+								.flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
 									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
 										int numOutEdges = (int) (Math.random() * (numVertices / 2));
 										for (int i = 0; i < numOutEdges; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 34c9ad8..43c0b84 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -53,7 +53,7 @@ public class SpargelPageRankCountingVertices {
 		
 		// generate some random edges. the transition probability on each edge is 1/num-out-edges of the source vertex
 		DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = env.generateSequence(1, NUM_VERTICES)
-								.flatMap(new FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+								.flatMap(new RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
 									public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
 										int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
 										for (int i = 0; i < numOutEdges; i++) {
@@ -67,12 +67,12 @@ public class SpargelPageRankCountingVertices {
 		
 		// count the number of vertices
 		DataSet<Long> count = vertices
-			.map(new MapFunction<Long, Long>() {
+			.map(new RichMapFunction<Long, Long>() {
 				public Long map(Long value) {
 					return 1L;
 				}
 			})
-			.reduce(new ReduceFunction<Long>() {
+			.reduce(new RichReduceFunction<Long>() {
 				public Long reduce(Long value1, Long value2) {
 					return value1 + value2;
 				}
@@ -80,7 +80,7 @@ public class SpargelPageRankCountingVertices {
 		
 		// enumerate some sample edges and assign an initial uniform probability (rank)
 		DataSet<Tuple2<Long, Double>> intialRanks = vertices
-			.map(new MapFunction<Long, Tuple2<Long, Double>>() {
+			.map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
 				
 				private long numVertices;
 				

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
index a34f2db..16b004c 100644
--- a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.spargel;
 
 import java.io.BufferedReader;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -72,7 +72,7 @@ public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class EdgeParser extends MapFunction<String, Tuple2<Long, Long>> {
+	public static final class EdgeParser extends RichMapFunction<String, Tuple2<Long, Long>> {
 		public Tuple2<Long, Long> map(String value) {
 			String[] nums = value.split(" ");
 			return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 7320b7b..2b64b84 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -19,7 +19,7 @@
 package org.apache.flink.client.testjar;
 
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
@@ -75,7 +75,7 @@ public class WordCount {
 	 * FlatMapFunction. The function takes a line (String) and splits it into 
 	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
 	 */
-	public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+	public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
index 9f10be5..6eccc8a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.compiler.util;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 
 
-public class NoOpFunction extends AbstractFunction {
+public class NoOpFunction extends AbstractRichFunction {
 	private static final long serialVersionUID = 1L;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 571f4e4..10fa34f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.CoGroupOperator;
 import org.apache.flink.api.java.record.operators.CrossOperator;
@@ -40,7 +40,6 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.JoinOperator;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
@@ -959,7 +958,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		result1.join(result2)
 				.where(new IdentityKeyExtractor<String>())
 				.equalTo(new IdentityKeyExtractor<String>())
-				.with(new JoinFunction<String, String, String>() {
+				.with(new RichJoinFunction<String, String, String>() {
 					@Override
 					public String join(String first, String second) {
 						return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
index 55c53f0..8226dbf 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,9 +28,8 @@ import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.dag.TempMode;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -256,7 +255,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 		
 	}
 	
-	private static class DummyJoiner extends JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
+	private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
 
 		@Override
 		public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 1c30545..3624d86 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -21,16 +21,15 @@ package org.apache.flink.compiler;
 
 import java.util.Iterator;
 
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -43,13 +42,13 @@ import org.apache.flink.util.Visitor;
 @SuppressWarnings("serial")
 public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 	
-	public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+	public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
 		@Override
 		public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
 		}
 	}
 
-	public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+	public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
 		@Override
 		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index ff4d6b0..b2c163b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -27,14 +27,12 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.compiler.DataStatistics;
-import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.DefaultCostEstimator;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.PlanNode;
@@ -181,7 +179,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 		}
 		
 		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+		public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) {
 			List<PlanNode> nodes = this.map.get(name);
 			if (nodes == null || nodes.isEmpty()) {
 				throw new RuntimeException("No node found with the given name and stub class.");
@@ -243,7 +241,7 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 		}
 		
 		@SuppressWarnings("unchecked")
-		public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
+		public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
 			List<Operator<?>> nodes = this.map.get(name);
 			if (nodes == null || nodes.isEmpty()) {
 				throw new RuntimeException("No node found with the given name and stub class.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index d50a7d6..0fbf072 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -22,8 +22,8 @@ package org.apache.flink.compiler;
 import static org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
 import static org.junit.Assert.*;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.Order;
@@ -47,7 +47,7 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SourcePlanNode;
 import org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.compiler.testfunctions.DummyJoinFunction;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -1426,10 +1426,10 @@ public class FeedbackPropertiesMatchTest {
 	}
 	
 	private static final MapNode getMapNode() {
-		return new MapNode(new MapOperatorBase<String, String, GenericMap<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
+		return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
 	}
 	
 	private static final MatchNode getJoinNode() {
-		return new MatchNode(new JoinOperatorBase<String, String, String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+		return new MatchNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index f3e513a..b9cc769 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -23,9 +23,9 @@ import java.util.Iterator;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.ReduceGroupOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Collector;
@@ -50,7 +50,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			
 			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
 			
-			data.reduceGroup(new GroupReduceFunction<Double, Double>() {
+			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
 				public void reduce(Iterator<Double> values, Collector<Double> out) {}
 			}).name("reducer")
 			.print().name("sink");
@@ -94,7 +94,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			
 			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
 			
-			ReduceGroupOperator<Long, Long> reduced = data.reduceGroup(new GroupReduceFunction<Long, Long>() {
+			GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
 				public void reduce(Iterator<Long> values, Collector<Long> out) {}
 			}).name("reducer");
 			
@@ -147,7 +147,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			
 			data
 				.groupBy(1)
-				.reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
 			.print().name("sink");
@@ -194,9 +194,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
 			
-			ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
 					.groupBy(1)
-					.reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+					.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer");
 			
@@ -255,7 +255,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
 					public String getKey(Tuple2<String, Double> value) { return value.f0; }
 				})
-				.reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
 			.print().name("sink");
@@ -309,11 +309,11 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
 				.name("source").setParallelism(6);
 			
-			ReduceGroupOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
+			GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
 				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
 					public String getKey(Tuple2<String, Double> value) { return value.f0; }
 				})
-				.reduceGroup(new GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterator<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer");
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index d3c3e3f..8fc4324 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -31,10 +32,9 @@ import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -261,7 +261,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		
 	}
 	
-	public static final class Join222 extends JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		@Override
 		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -269,13 +269,13 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		}
 	}
 	
-	public static final class FlatMapJoin extends FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+	public static final class FlatMapJoin extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
 		
 		@Override
 		public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value, Collector<Tuple2<Long, Long>> out) {}
 	}
 	
-	public static final class DummyMap extends MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class DummyMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		@Override
 		public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
@@ -284,14 +284,14 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	}
 	
 	@ConstantFields("0")
-	public static final class Reduce101 extends GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+	public static final class Reduce101 extends RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
 		
 		@Override
 		public void reduce(Iterator<Tuple1<Long>> values, Collector<Tuple1<Long>> out) {}
 	}
 	
 	@ConstantFields("0")
-	public static final class DuplicateValue extends MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+	public static final class DuplicateValue extends RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
 
 		@Override
 		public Tuple2<Long, Long> map(Tuple1<Long> value) throws Exception {
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		}
 	}
 	
-	public static final class DuplicateValueScalar<T> extends MapFunction<T, Tuple2<T, T>> {
+	public static final class DuplicateValueScalar<T> extends RichMapFunction<T, Tuple2<T, T>> {
 
 		@Override
 		public Tuple2<T, T> map(T value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index a654872..fb8ae8d 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.compiler;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
@@ -46,7 +46,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			
 			DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
 			
-			data.reduce(new ReduceFunction<Double>() {
+			data.reduce(new RichReduceFunction<Double>() {
 				
 				@Override
 				public Double reduce(Double value1, Double value2){
@@ -91,7 +91,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			
 			DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
 			
-			data.reduce(new ReduceFunction<Long>() {
+			data.reduce(new RichReduceFunction<Long>() {
 				
 				@Override
 				public Long reduce(Long value1, Long value2){
@@ -145,7 +145,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			
 			data
 				.groupBy(1)
-				.reduce(new ReduceFunction<Tuple2<String,Double>>() {
+				.reduce(new RichReduceFunction<Tuple2<String,Double>>() {
 				@Override
 				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
 					return null;
@@ -205,7 +205,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 				.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
 					public String getKey(Tuple2<String, Double> value) { return value.f0; }
 				})
-				.reduce(new ReduceFunction<Tuple2<String,Double>>() {
+				.reduce(new RichReduceFunction<Tuple2<String,Double>>() {
 				@Override
 				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
 					return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
index 1f653c0..1020c8b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
@@ -174,7 +174,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 		});
 	}
 
-	public static final class DummyFlatMap extends FlatMapFunction<String, Tuple2<String, Integer>> {
+	public static final class DummyFlatMap extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index e04256c..64a4791 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -214,7 +214,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			iter.getWorkset().join(invariantInput)
 				.where(1, 2)
 				.equalTo(1, 2)
-				.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+				.with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
 					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
 						return first;
 					}
@@ -224,7 +224,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			result.join(iter.getSolutionSet())
 				.where(1, 0)
 				.equalTo(0, 2)
-				.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+				.with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
 					public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
 						return second;
 					}
@@ -263,7 +263,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 		iter.getWorkset().join(invariantInput)
 			.where(1, 2)
 			.equalTo(1, 2)
-			.with(new JoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+			.with(new RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
 				public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
 					return first;
 				}
@@ -273,7 +273,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 		.join(iter.getSolutionSet())
 			.where(1, 0)
 			.equalTo(1, 2)
-			.with(new JoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+			.with(new RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
 				public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
 					return second;
 				}
@@ -282,7 +282,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			.withConstantSetSecond(joinPreservesSolutionSet ? new String[] {"0->0", "1->1", "2->2" } : null);
 			
 		DataSet<Tuple3<Long, Long, Long>> nextWorkset = joinedWithSolutionSet.groupBy(1, 2)
-			.reduceGroup(new GroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
+			.reduceGroup(new RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
 				public void reduce(Iterator<Tuple3<Long, Long, Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
 			})
 			.name(NEXT_WORKSET_REDUCER_NAME)
@@ -290,7 +290,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 		
 		
 		DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = mapBeforeSolutionDelta ?
-				joinedWithSolutionSet.map(new MapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
+				joinedWithSolutionSet.map(new RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } })
 					.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
 				joinedWithSolutionSet;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
new file mode 100644
index 0000000..2388db4
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
+import org.apache.flink.util.Collector;
+
+public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void join(T first, T second, Collector<T> out) {
+		out.collect(null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
deleted file mode 100644
index 0db075f..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.compiler.testfunctions;
-
-import org.apache.flink.api.java.functions.JoinFunction;
-
-public class DummyJoinFunction<T> extends JoinFunction<T, T, T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public T join(T first, T second) {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index fe61f25..42275af 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 
 @Combinable
-public class IdentityGroupReducer<T> extends GroupReduceFunction<T, T> {
+public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
index b6aa40b..29fc2c8 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
@@ -19,9 +19,9 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 
-public class IdentityMapper<T> extends MapFunction<T, T> {
+public class IdentityMapper<T> extends RichMapFunction<T, T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
index 91634cc..7ce267f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 
-public class SelectOneReducer<T> extends ReduceFunction<T> {
+public class SelectOneReducer<T> extends RichReduceFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index 26db00e..3f24e65 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 
 @Combinable
-public class Top1GroupReducer<T> extends GroupReduceFunction<T, T> {
+public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
index 51ad75d..736ee14 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
@@ -23,14 +23,13 @@ import java.io.Serializable;
 
 import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
 
-public class DummyCrossStub extends CrossFunction implements Serializable {
+public class DummyCrossStub extends CrossFunction {
 	private static final long serialVersionUID = 1L;
 
+
 	@Override
-	public void cross(Record record1, Record record2, Collector<Record> out) {
-		out.collect(record1);
-		out.collect(record2);
+	public Record cross(Record first, Record second) throws Exception {
+		return first;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
deleted file mode 100644
index f4b2763..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An abstract stub implementation for user-defined functions. It offers default implementations
- * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the
- * {@link RuntimeContext} and {@link IterationRuntimeContext}.
- */
-public abstract class AbstractFunction implements Function, Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Runtime context access
-	// --------------------------------------------------------------------------------------------
-	
-	private transient RuntimeContext runtimeContext;
-
-	public void setRuntimeContext(RuntimeContext t) {
-		if (this.runtimeContext == null) {
-			this.runtimeContext = t;
-		} else {
-			throw new IllegalStateException("Error: The runtime context has already been set.");
-		}
-	}
-	
-	public RuntimeContext getRuntimeContext() {
-		if (this.runtimeContext != null) {
-			return this.runtimeContext;
-		} else {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		}
-	}
-	
-	public IterationRuntimeContext getIterationRuntimeContext() {
-		if (this.runtimeContext == null) {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
-			return (IterationRuntimeContext) this.runtimeContext;
-		} else {
-			throw new IllegalStateException("This stub is not part of an iteration step function.");
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Default life cycle methods
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void open(Configuration parameters) throws Exception {}
-
-	@Override
-	public void close() throws Exception {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
new file mode 100644
index 0000000..07b957d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An abstract stub implementation for user-defined functions. It offers default implementations
+ * for {@link #open(Configuration)} and {@link #close()}. It also offers access to the
+ * {@link RuntimeContext} and {@link IterationRuntimeContext}.
+ */
+public abstract class AbstractRichFunction implements RichFunction, Serializable {
+	
+	private static final long serialVersionUID = 1L;
+	
+	// --------------------------------------------------------------------------------------------
+	//  Runtime context access
+	// --------------------------------------------------------------------------------------------
+	
+	private transient RuntimeContext runtimeContext;
+
+	public void setRuntimeContext(RuntimeContext t) {
+		if (this.runtimeContext == null) {
+			this.runtimeContext = t;
+		} else {
+			throw new IllegalStateException("Error: The runtime context has already been set.");
+		}
+	}
+	
+	public RuntimeContext getRuntimeContext() {
+		if (this.runtimeContext != null) {
+			return this.runtimeContext;
+		} else {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		}
+	}
+	
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		if (this.runtimeContext == null) {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
+			return (IterationRuntimeContext) this.runtimeContext;
+		} else {
+			throw new IllegalStateException("This stub is not part of an iteration step function.");
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Default life cycle methods
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void open(Configuration parameters) throws Exception {}
+
+	@Override
+	public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
new file mode 100644
index 0000000..5c200af
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+
+public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
+	
+	/**
+	 * This method must be implemented to provide a user implementation of a
+	 * coGroup. It is called for each two key-value pairs that share the same
+	 * key and come from different inputs.
+	 * 
+	 * @param first The records from the first input which were paired with the key.
+	 * @param second The records from the second input which were paired with the key.
+	 * @param out A collector that collects all output pairs.
+	 */
+	void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) throws Exception;
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
new file mode 100644
index 0000000..d72c4c8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * Generic interface used for combiners.
+ */
+public interface CombineFunction<T> extends Function, Serializable {
+
+	T combine(Iterator<T> records) throws Exception;
+}