You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:46 UTC

[15/39] [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index 2f749d4..4c8177a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -147,8 +147,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
 		}
 	}
 
-	public static final class NeighborWithComponentIDJoin extends JoinFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -161,8 +160,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
 		}
 	}
 
-	public static final class MinimumReduce extends GroupReduceFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@@ -189,8 +187,7 @@ public class ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
 	}
 
 	@SuppressWarnings("serial")
-	public static final class MinimumIdFilter extends FlatMapFunction
-		<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+	public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
 
 		private static LongSumAggregatorWithParameter aggr;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index fa1676f..104c3df 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -25,9 +25,9 @@ import java.util.List;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -139,8 +139,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
 		}
 	}
 
-	public static final class NeighborWithComponentIDJoin extends JoinFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -153,8 +152,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
 		}
 	}
 
-	public static final class MinimumReduce extends GroupReduceFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		private static final long serialVersionUID = 1L;
 		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@@ -181,8 +179,7 @@ public class ConnectedComponentsWithParametrizableConvergenceITCase extends Java
 	}
 
 	@SuppressWarnings("serial")
-	public static final class MinimumIdFilter extends FlatMapFunction
-		<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+	public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
 
 		private static LongSumAggregator aggr;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
index 0475a4f..1ec0eb4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.iterative.nephele.ConfigUtils;
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
@@ -32,7 +32,7 @@ import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
 import org.apache.flink.util.Collector;
 
-public class CustomCompensatableDotProductCoGroup extends AbstractFunction implements GenericCoGrouper<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
+public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
index 28c77ba..b44d914 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.iterative.nephele.ConfigUtils;
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
@@ -31,8 +31,8 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
 import org.apache.flink.util.Collector;
 
-public class CustomCompensatableDotProductMatch extends AbstractFunction implements
-		GenericJoiner<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank>
+public class CustomCompensatableDotProductMatch extends AbstractRichFunction implements
+		FlatJoinFunction<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank>
 {
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
index 74426c0..d83b33b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import java.util.Set;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.iterative.nephele.ConfigUtils;
@@ -29,7 +29,7 @@ import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
 import org.apache.flink.util.Collector;
 
-public class CustomCompensatingMap extends AbstractFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
+public class CustomCompensatingMap extends AbstractRichFunction implements GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 8af9247..1e08a9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -20,15 +20,15 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
 import org.apache.flink.util.Collector;
 
 
-public class CustomRankCombiner extends AbstractFunction implements GenericGroupReduce<VertexWithRank, VertexWithRank>,
-		GenericCombine<VertexWithRank>
+public class CustomRankCombiner extends AbstractRichFunction implements GroupReduceFunction<VertexWithRank, VertexWithRank>,
+		FlatCombineFunction<VertexWithRank>
 {
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index b914c1c..3749c1d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -24,8 +24,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -301,7 +302,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 	
 	}
 	
-	public static class Tuple5CoGroup extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
+	public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -330,7 +331,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class CustomTypeCoGroup extends CoGroupFunction<CustomType, CustomType, CustomType> {
+	public static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -358,7 +359,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		
 	}
 	
-	public static class MixedCoGroup extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+	public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -388,7 +389,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		
 	}
 	
-	public static class MixedCoGroup2 extends CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
+	public static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -417,7 +418,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		
 	}
 	
-	public static class Tuple3ReturnLeft extends CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+	public static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		
 		private static final long serialVersionUID = 1L;
 
@@ -434,7 +435,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class Tuple5ReturnRight extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+	public static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 		
 		private static final long serialVersionUID = 1L;
 
@@ -456,7 +457,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
 	}
 	
-	public static class Tuple5CoGroupBC extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+	public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index dabe7fc..304dda2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.CrossFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.java.functions.RichCrossFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -400,7 +401,7 @@ public class CrossITCase extends JavaProgramTestBase {
 	
 	}
 	
-	public static class Tuple5Cross extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
+	public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -416,7 +417,7 @@ public class CrossITCase extends JavaProgramTestBase {
 
 	}
 	
-	public static class CustomTypeCross extends CrossFunction<CustomType, CustomType, CustomType> {
+	public static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -429,7 +430,7 @@ public class CrossITCase extends JavaProgramTestBase {
 		
 	}
 	
-	public static class MixedCross extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+	public static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -444,7 +445,7 @@ public class CrossITCase extends JavaProgramTestBase {
 	}
 	
 	
-	public static class Tuple3ReturnLeft extends CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
+	public static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
 		
 		private static final long serialVersionUID = 1L;
 
@@ -457,7 +458,7 @@ public class CrossITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class Tuple5ReturnRight extends CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+	public static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 		
 		private static final long serialVersionUID = 1L;
 
@@ -473,7 +474,7 @@ public class CrossITCase extends JavaProgramTestBase {
 
 	}
 	
-	public static class Tuple5CrossBC extends CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+	public static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 203117c..0c6f3cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -164,7 +164,7 @@ public class DistinctITCase extends JavaProgramTestBase {
 										return in.myInt;
 									}
 								})
-						.map(new MapFunction<CollectionDataSets.CustomType, Tuple1<Integer>>() {
+						.map(new RichMapFunction<CustomType, Tuple1<Integer>>() {
 							@Override
 							public Tuple1<Integer> map(CustomType value) throws Exception {
 								return new Tuple1<Integer>(value.myInt);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
index 96174da..6613bc1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -38,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class FilterITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 8; 
+	private static int NUM_PROGRAMS = 8;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -268,7 +269,7 @@ public class FilterITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
+						filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
 							private static final long serialVersionUID = 1L;
 
 							int literal = -1;
@@ -306,7 +307,7 @@ public class FilterITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Tuple3<Integer, Long, String>> filterDs = ds.
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
+						filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
 							private static final long serialVersionUID = 1L;
 							private  int broadcastSum = 0;
 							

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
index 1c97347..a6dd377 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -323,7 +324,7 @@ public class FlatMapITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Tuple3<Integer, Long, String>> bcFlatMapDs = ds.
-						flatMap(new FlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+						flatMap(new RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
 							private static final long serialVersionUID = 1L;
 							private final Tuple3<Integer, Long, String> outTuple = 
 									new Tuple3<Integer, Long, String>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 6556b5e..7376e86 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -24,10 +24,11 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -89,332 +90,336 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 	private static class GroupReduceProgs {
 		
 		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
+
+			switch (progId) {
+				case 1: {
 				
 				/*
 				 * check correctness of groupReduce on tuples with key field selector
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Integer, Long>> reduceDs = ds.
-						groupBy(1).reduceGroup(new Tuple3GroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1\n" +
-						"5,2\n" +
-						"15,3\n" +
-						"34,4\n" +
-						"65,5\n" +
-						"111,6\n";
-			}
-			case 2: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple2<Integer, Long>> reduceDs = ds.
+							groupBy(1).reduceGroup(new Tuple3GroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1\n" +
+							"5,2\n" +
+							"15,3\n" +
+							"34,4\n" +
+							"65,5\n" +
+							"111,6\n";
+				}
+				case 2: {
 				
 				/*
 				 * check correctness of groupReduce on tuples with multiple key field selector
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-				DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
-						groupBy(4,0).reduceGroup(new Tuple5GroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,0,P-),1\n" +
-						"2,3,0,P-),1\n" +
-						"2,2,0,P-),2\n" +
-						"3,9,0,P-),2\n" +
-						"3,6,0,P-),3\n" +
-						"4,17,0,P-),1\n" +
-						"4,17,0,P-),2\n" +
-						"5,11,0,P-),1\n" +
-						"5,29,0,P-),2\n" +
-						"5,25,0,P-),3\n";
-			}
-			case 3: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+					DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+							groupBy(4, 0).reduceGroup(new Tuple5GroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1,0,P-),1\n" +
+							"2,3,0,P-),1\n" +
+							"2,2,0,P-),2\n" +
+							"3,9,0,P-),2\n" +
+							"3,6,0,P-),3\n" +
+							"4,17,0,P-),1\n" +
+							"4,17,0,P-),2\n" +
+							"5,11,0,P-),1\n" +
+							"5,29,0,P-),2\n" +
+							"5,25,0,P-),3\n";
+				}
+				case 3: {
 				
 				/*
 				 * check correctness of groupReduce on tuples with key field selector and group sorting
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(1);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).sortGroup(2,Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"5,2,Hello-Hello world\n" +
-						"15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" +
-						"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
-						"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
-						"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
-								
-			}
-			case 4: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(1);
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+							groupBy(1).sortGroup(2, Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1,Hi\n" +
+							"5,2,Hello-Hello world\n" +
+							"15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n" +
+							"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
+							"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
+							"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
+
+				}
+				case 4: {
 				/*
 				 * check correctness of groupReduce on tuples with key extractor
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Integer, Long>> reduceDs = ds.
-						groupBy(new KeySelector<Tuple3<Integer,Long,String>, Long>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Long getKey(Tuple3<Integer, Long, String> in) {
-										return in.f1;
-									}
-								}).reduceGroup(new Tuple3GroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1\n" +
-						"5,2\n" +
-						"15,3\n" +
-						"34,4\n" +
-						"65,5\n" +
-						"111,6\n";
-				
-			}
-			case 5: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple2<Integer, Long>> reduceDs = ds.
+							groupBy(new KeySelector<Tuple3<Integer, Long, String>, Long>() {
+								private static final long serialVersionUID = 1L;
+
+								@Override
+								public Long getKey(Tuple3<Integer, Long, String> in) {
+									return in.f1;
+								}
+							}).reduceGroup(new Tuple3GroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1\n" +
+							"5,2\n" +
+							"15,3\n" +
+							"34,4\n" +
+							"65,5\n" +
+							"111,6\n";
+
+				}
+				case 5: {
 				
 				/*
 				 * check correctness of groupReduce on custom type with type extractor
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> reduceDs = ds.
-						groupBy(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).reduceGroup(new CustomTypeGroupReduce());
-				
-				reduceDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,Hello!\n" +
-						"2,3,Hello!\n" +
-						"3,12,Hello!\n" +
-						"4,30,Hello!\n" +
-						"5,60,Hello!\n" +
-						"6,105,Hello!\n";
-			}
-			case 6: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+					DataSet<CustomType> reduceDs = ds.
+							groupBy(new KeySelector<CustomType, Integer>() {
+								private static final long serialVersionUID = 1L;
+
+								@Override
+								public Integer getKey(CustomType in) {
+									return in.myInt;
+								}
+							}).reduceGroup(new CustomTypeGroupReduce());
+
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,0,Hello!\n" +
+							"2,3,Hello!\n" +
+							"3,12,Hello!\n" +
+							"4,30,Hello!\n" +
+							"5,60,Hello!\n" +
+							"6,105,Hello!\n";
+				}
+				case 6: {
 				
 				/*
 				 * check correctness of all-groupreduce for tuples
 				 */
 
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "231,91,Hello World\n";
-			}
-			case 7: {
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "231,91,Hello World\n";
+				}
+				case 7: {
 				/*
 				 * check correctness of all-groupreduce for custom types
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
-				
-				reduceDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "91,210,Hello!";
-			}
-			case 8: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+					DataSet<CustomType> reduceDs = ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
+
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "91,210,Hello!";
+				}
+				case 8: {
 				
 				/*
 				 * check correctness of groupReduce with broadcast set
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,55\n" +
-						"5,2,55\n" +
-						"15,3,55\n" +
-						"34,4,55\n" +
-						"65,5,55\n" +
-						"111,6,55\n";
-			}
-			case 9: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+							groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1,55\n" +
+							"5,2,55\n" +
+							"15,3,55\n" +
+							"34,4,55\n" +
+							"65,5,55\n" +
+							"111,6,55\n";
+				}
+				case 9: {
 				
 				/*
 				 * check correctness of groupReduce if UDF returns input objects multiple times and changes it in between
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "11,1,Hi!\n" +
-						"21,1,Hi again!\n" +
-						"12,2,Hi!\n" +
-						"22,2,Hi again!\n" +
-						"13,2,Hi!\n" +
-						"23,2,Hi again!\n";
-			}
-			case 10: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+							groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "11,1,Hi!\n" +
+							"21,1,Hi again!\n" +
+							"12,2,Hi!\n" +
+							"22,2,Hi again!\n" +
+							"13,2,Hi!\n" +
+							"23,2,Hi again!\n";
+				}
+				case 10: {
 				
 				/*
 				 * check correctness of groupReduce on custom type with key extractor and combine
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-				DataSet<CustomType> reduceDs = ds.
-						groupBy(new KeySelector<CustomType, Integer>() {
-									private static final long serialVersionUID = 1L;
-									@Override
-									public Integer getKey(CustomType in) {
-										return in.myInt;
-									}
-								}).reduceGroup(new CustomTypeGroupReduceWithCombine());
-				
-				reduceDs.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,0,test1\n" +
-						"2,3,test2\n" +
-						"3,12,test3\n" +
-						"4,30,test4\n" +
-						"5,60,test5\n" +
-						"6,105,test6\n";
-			}
-			case 11: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+					DataSet<CustomType> reduceDs = ds.
+							groupBy(new KeySelector<CustomType, Integer>() {
+								private static final long serialVersionUID = 1L;
+
+								@Override
+								public Integer getKey(CustomType in) {
+									return in.myInt;
+								}
+							}).reduceGroup(new CustomTypeGroupReduceWithCombine());
+
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,0,test1\n" +
+							"2,3,test2\n" +
+							"3,12,test3\n" +
+							"4,30,test4\n" +
+							"5,60,test5\n" +
+							"6,105,test6\n";
+				}
+				case 11: {
 				
 				/*
 				 * check correctness of groupReduce on tuples with combine
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple2<Integer, String>> reduceDs = ds.
-						groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,test1\n" +
-						"5,test2\n" +
-						"15,test3\n" +
-						"34,test4\n" +
-						"65,test5\n" +
-						"111,test6\n";
-			}
-			// all-groupreduce with combine
-			case 12: {
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(2); // important because it determines how often the combiner is called
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple2<Integer, String>> reduceDs = ds.
+							groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,test1\n" +
+							"5,test2\n" +
+							"15,test3\n" +
+							"34,test4\n" +
+							"65,test5\n" +
+							"111,test6\n";
+				}
+				// all-groupreduce with combine
+				case 12: {
 				
 				/*
 				 * check correctness of all-groupreduce for tuples with combine
 				 */
 
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env)
-							.map(new IdentityMapper<Tuple3<Integer,Long,String>>()).setParallelism(4);
-				
-				Configuration cfg = new Configuration();
-				cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
-				DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
-						.withParameters(cfg);
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
-			}
-			// descending sort not working
-			case 13: {
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env)
+							.map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
+
+					Configuration cfg = new Configuration();
+					cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
+					DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
+							.withParameters(cfg);
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
+				}
+				// descending sort not working
+				case 13: {
 				
 				/*
 				 * check correctness of groupReduce on tuples with key field selector and group sorting
 				 */
-				
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.setDegreeOfParallelism(1);
-				
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
-						groupBy(1).sortGroup(2,Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
-				
-				reduceDs.writeAsCsv(resultPath);
-				env.execute();
-				
-				// return expected result
-				return "1,1,Hi\n" +
-						"5,2,Hello world-Hello\n" +
-						"15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
-						"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-						"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-						"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
-				
-			}
-			default: 
-			throw new IllegalArgumentException("Invalid program id");
+
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+					env.setDegreeOfParallelism(1);
+
+					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+					DataSet<Tuple3<Integer, Long, String>> reduceDs = ds.
+							groupBy(1).sortGroup(2, Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+					reduceDs.writeAsCsv(resultPath);
+					env.execute();
+
+					// return expected result
+					return "1,1,Hi\n" +
+							"5,2,Hello world-Hello\n" +
+							"15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n" +
+							"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
+							"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
+							"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+				}
+				default: {
+					throw new IllegalArgumentException("Invalid program id");
+				}
 			}
 		}
 	
 	}
 	
-	public static class Tuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+	public static class Tuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
 		private static final long serialVersionUID = 1L;
 
 
@@ -436,7 +441,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class Tuple3SortedGroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+	public static class Tuple3SortedGroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 
 
@@ -462,7 +467,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class Tuple5GroupReduce extends GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+	public static class Tuple5GroupReduce implements GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -486,7 +491,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class CustomTypeGroupReduce extends GroupReduceFunction<CustomType, CustomType> {
+	public static class CustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> {
 		private static final long serialVersionUID = 1L;
 		
 
@@ -511,8 +516,9 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 			
 		}
 	}
-	
-	public static class InputReturningTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+
+	public static class InputReturningTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -534,7 +540,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class AllAddingTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+	public static class AllAddingTuple3GroupReduce implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
@@ -554,7 +560,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class AllAddingCustomTypeGroupReduce extends GroupReduceFunction<CustomType, CustomType> {
+	public static class AllAddingCustomTypeGroupReduce implements GroupReduceFunction<CustomType, CustomType> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
@@ -579,7 +585,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class BCTuple3GroupReduce extends GroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> {
+	public static class BCTuple3GroupReduce extends RichGroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private String f2Replace = "";
 		
@@ -613,8 +619,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	@org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
-	public static class Tuple3GroupReduceWithCombine extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
+	@RichGroupReduceFunction.Combinable
+	public static class Tuple3GroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -650,8 +656,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	@org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
-	public static class Tuple3AllGroupReduceWithCombine extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
+	@RichGroupReduceFunction.Combinable
+	public static class Tuple3AllGroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
@@ -686,8 +692,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	@org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
-	public static class CustomTypeGroupReduceWithCombine extends GroupReduceFunction<CustomType, CustomType> {
+	@RichGroupReduceFunction.Combinable
+	public static class CustomTypeGroupReduceWithCombine extends RichGroupReduceFunction<CustomType, CustomType> {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
@@ -723,7 +729,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class IdentityMapper<T> extends MapFunction<T, T> {
+	public static final class IdentityMapper<T> extends RichMapFunction<T, T> {
 
 		@Override
 		public T map(T value) { return value; }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index ffad949..a293cbf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -33,6 +35,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -101,7 +104,7 @@ public class JoinITCase extends JavaProgramTestBase {
 						ds1.join(ds2)
 						.where(1)
 						.equalTo(1)
-						.with(new T3T5Join());
+						.with(new T3T5FlatJoin());
 				
 				joinDs.writeAsCsv(resultPath);
 				env.execute();
@@ -126,7 +129,7 @@ public class JoinITCase extends JavaProgramTestBase {
 						ds1.join(ds2)
 						   .where(0,1)
 						   .equalTo(0,4)
-						   .with(new T3T5Join());
+						   .with(new T3T5FlatJoin());
 				
 				joinDs.writeAsCsv(resultPath);
 				env.execute();
@@ -177,7 +180,7 @@ public class JoinITCase extends JavaProgramTestBase {
 				DataSet<Tuple2<String, String>> joinDs = ds1.joinWithHuge(ds2)
 															.where(1)
 															.equalTo(1)
-															.with(new T3T5Join());
+															.with(new T3T5FlatJoin());
 				
 				joinDs.writeAsCsv(resultPath);
 				env.execute();
@@ -202,7 +205,7 @@ public class JoinITCase extends JavaProgramTestBase {
 						ds1.joinWithTiny(ds2)
 						   .where(1)
 						   .equalTo(1)
-						   .with(new T3T5Join());
+						   .with(new T3T5FlatJoin());
 				
 				joinDs.writeAsCsv(resultPath);
 				env.execute();
@@ -292,35 +295,35 @@ public class JoinITCase extends JavaProgramTestBase {
 			}
 			case 9: {
 			
-			/*
-			 * Join on a tuple input with key field selector and a custom type input with key extractor
-			 */
-			
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				/*
+				 * Join on a tuple input with key field selector and a custom type input with key extractor
+				 */
 
-			DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-			DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-			DataSet<Tuple2<String, String>> joinDs = 
-					ds1.join(ds2)
-					   .where(new KeySelector<CustomType, Integer>() {
-								   @Override
-								   public Integer getKey(CustomType value) {
-									   return value.myInt;
-								   }
-							   }
-							   )
-					   .equalTo(0)
-					   .with(new CustT3Join());
-			
-			joinDs.writeAsCsv(resultPath);
-			env.execute();
-			
-			// return expected result
-			return "Hi,Hi\n" +
-					"Hello,Hello\n" +
-					"Hello world,Hello\n";
-			
-			}
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
+				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple2<String, String>> joinDs =
+						ds1.join(ds2)
+						   .where(new KeySelector<CustomType, Integer>() {
+									  @Override
+									  public Integer getKey(CustomType value) {
+										  return value.myInt;
+									  }
+								  }
+						   )
+						   .equalTo(0)
+						   .with(new CustT3Join());
+
+				joinDs.writeAsCsv(resultPath);
+				env.execute();
+
+				// return expected result
+				return "Hi,Hi\n" +
+						"Hello,Hello\n" +
+						"Hello world,Hello\n";
+
+				}
 			case 10: {
 				
 				/*
@@ -458,38 +461,39 @@ public class JoinITCase extends JavaProgramTestBase {
 	
 	}
 	
-	public static class T3T5Join extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
+	public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
 
 		@Override
-		public Tuple2<String, String> join(Tuple3<Integer, Long, String> first,
-				Tuple5<Integer, Long, Integer, String, Long> second)  {
-			
-			return new Tuple2<String,String>(first.f2, second.f3);
+		public void join(Tuple3<Integer, Long, String> first,
+				Tuple5<Integer, Long, Integer, String, Long> second,
+				Collector<Tuple2<String,String>> out)  {
+
+			out.collect (new Tuple2<String,String> (first.f2, second.f3));
 		}
-		
+
 	}
 	
-	public static class LeftReturningJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
+	public static class LeftReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
 
 		@Override
 		public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, String> first,
-				Tuple5<Integer, Long, Integer, String, Long> second) {
+												  Tuple5<Integer, Long, Integer, String, Long> second) {
 			
 			return first;
 		}
 	}
 	
-	public static class RightReturningJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+	public static class RightReturningJoin implements JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 
 		@Override
 		public Tuple5<Integer, Long, Integer, String, Long> join(Tuple3<Integer, Long, String> first,
-				Tuple5<Integer, Long, Integer, String, Long> second) {
+																 Tuple5<Integer, Long, Integer, String, Long> second) {
 			
 			return second;
 		}
 	}
 		
-	public static class T3T5BCJoin extends JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
+	public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
 
 		private int broadcast;
 		
@@ -505,6 +509,7 @@ public class JoinITCase extends JavaProgramTestBase {
 			
 		}
 
+		/*
 		@Override
 		public Tuple3<String, String, Integer> join(
 				Tuple3<Integer, Long, String> first,
@@ -512,19 +517,25 @@ public class JoinITCase extends JavaProgramTestBase {
 
 			return new Tuple3<String, String, Integer>(first.f2, second.f3, broadcast);
 		}
+		*/
+
+		@Override
+		public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple3<String, String, Integer>> out) throws Exception {
+			out.collect(new Tuple3<String, String, Integer> (first.f2, second.f3, broadcast));
+		}
 	}
 	
-	public static class T3CustJoin extends JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> {
+	public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> {
 
 		@Override
 		public Tuple2<String, String> join(Tuple3<Integer, Long, String> first,
-				CustomType second) {
+										   CustomType second) {
 
 			return new Tuple2<String, String>(first.f2, second.myString);
 		}
 	}
 	
-	public static class CustT3Join extends JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
+	public static class CustT3Join implements JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
 
 		@Override
 		public Tuple2<String, String> join(CustomType first, Tuple3<Integer, Long, String> second) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index 0921e82..4f1fb1a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -25,7 +25,8 @@ import java.util.LinkedList;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -396,7 +397,7 @@ public class MapITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
-						map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+						map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
 							private static final long serialVersionUID = 1L;
 							private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
 							private Integer f2Replace = 0;
@@ -457,7 +458,7 @@ public class MapITCase extends JavaProgramTestBase {
 				final int testValue = 666;
 				conf.setInteger(testKey, testValue);
 				DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
-						map(new MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+						map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
 							private static final long serialVersionUID = 1L;
 							
 							@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 6cc1061..a296a09 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -23,8 +23,9 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
@@ -270,7 +271,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 						"65,5,Hi again!\n" +
 						"111,6,Hi again!\n";
 			}
-			default: 
+			default:
 				throw new IllegalArgumentException("Invalid program id");
 			}
 			
@@ -278,7 +279,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 	
 	}
 	
-	public static class Tuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+	public static class Tuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
 		private final String f2Replace;
@@ -306,7 +307,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class Tuple5Reduce extends ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
+	public static class Tuple5Reduce implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, Long>();
 		
@@ -321,7 +322,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class CustomTypeReduce extends ReduceFunction<CustomType> {
+	public static class CustomTypeReduce implements ReduceFunction<CustomType> {
 		private static final long serialVersionUID = 1L;
 		private final CustomType out = new CustomType();
 		
@@ -336,7 +337,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class InputReturningTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+	public static class InputReturningTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -350,7 +351,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class AllAddingTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+	public static class AllAddingTuple3Reduce implements ReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
 		
@@ -364,7 +365,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class AllAddingCustomTypeReduce extends ReduceFunction<CustomType> {
+	public static class AllAddingCustomTypeReduce implements ReduceFunction<CustomType> {
 		private static final long serialVersionUID = 1L;
 		private final CustomType out = new CustomType();
 		
@@ -379,7 +380,7 @@ public class ReduceITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static class BCTuple3Reduce extends ReduceFunction<Tuple3<Integer, Long, String>> {
+	public static class BCTuple3Reduce extends RichReduceFunction<Tuple3<Integer, Long, String>> {
 		private static final long serialVersionUID = 1L;
 		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
 		private String f2Replace = "";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 09191cc..a636ba4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -145,7 +145,7 @@ public class UnionITCase extends JavaProgramTestBase {
 				
 				// Don't know how to make an empty result in an other way than filtering it 
 				DataSet<Tuple3<Integer, Long, String>> empty = CollectionDataSets.get3TupleDataSet(env).
-						filter(new FilterFunction<Tuple3<Integer,Long,String>>() {
+						filter(new RichFilterFunction<Tuple3<Integer,Long,String>>() {
 							private static final long serialVersionUID = 1L;
 
 							@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index ed573be..aaad08c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
+import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -49,6 +50,7 @@ import java.util.LinkedList;
 /**
  */
 @RunWith(Parameterized.class)
+//@Ignore("Test needs to be adapted to new cross signature")
 public class CrossITCase extends RecordAPITestBase {
 
 	private static final Log LOG = LogFactory.getLog(CrossITCase.class);
@@ -61,13 +63,30 @@ public class CrossITCase extends RecordAPITestBase {
 		super(testConfig);
 	}
 
-	private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n";
+	//private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n";
 
-	private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n";
+	//private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n";
 
-	private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
-		+ "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
-		+ "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n";
+	//private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
+	//	+ "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
+	//	+ "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n";
+
+	//private static final String RESULT = "10 1\n10 1\n10 5\n10 5\n4 1\n4 1\n4 2\n4 2\n5 0\n5 0\n5 1\n," +
+	//		"5 1\n5 2\n5 2\n5 4\n5 4\n6 -1\n6 -1\n6 0\n6 0\n6 1\n6 1\n6 3\n6 3\n6 3\n6 3\n6 6\n6 6\n7 -1\n" +
+	//		"7 -1\n7 -2\n7 -2\n7 0\n7 0\n7 2\n7 2\n7 2\n7 2\n7 4\n7 4\n7 5\n7 5\n7 8\n7 8\n8 -1\n8 -1\n8 1\n" +
+	//		"8 1\n8 1\n8 1\n8 3\n8 3\n8 4\n8 4\n8 7\n8 7\n9 0\n9 0\n9 2\n9 2\n9 3\n9 3\n9 6\n9 6\n";
+
+	//private static final String RESULT = "2 2\n4 4\n1 1\n3 3\n2 2\n4 4\n1 1\n3 3\n5 0\n5 1\n6 1\n 6 3\n" +
+	//		"7 2\n7 5\n8 3\n8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n" +
+	//		"6 6\n7 4\n7 8\n6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6\n5 0\n5 1\n6 1\n6 3\n7 2\n7 5\n 8 3\n" +
+	//		"8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n6 6\n7 4\n7 8\n" +
+	//		"6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6";
+
+
+	private static final String LEFT_IN = "1 1\n2 2\n3 3\n";
+	private static final String RIGHT_IN = "3 6\n4 4\n4 8\n";
+
+	private static final String RESULT = "6 6\n7 5\n7 8\n7 4\n8 3\n8 7\n8 4\n9 2\n9 6\n";
 
 	@Override
 	protected void preSubmit() throws Exception {
@@ -84,7 +103,7 @@ public class CrossITCase extends RecordAPITestBase {
 		private IntValue integer = new IntValue();
 		
 		@Override
-		public void cross(Record record1, Record record2, Collector<Record> out) {
+		public Record cross(Record record1, Record record2) throws Exception {
 			string = record1.getField(1, string);
 			int val1 = Integer.parseInt(string.toString());
 			string = record2.getField(1, string);
@@ -95,16 +114,14 @@ public class CrossITCase extends RecordAPITestBase {
 			int key2 = Integer.parseInt(string.toString());
 			
 			LOG.debug("Processing { [" + key1 + "," + val1 + "] , [" + key2 + "," + val2 + "] }");
-			
-			if (val1 + val2 <= 6) {
-				string.setValue((key1 + key2 + 2) + "");
-				integer.setValue(val2 - val1 + 1);
-				
-				record1.setField(0, string);
-				record1.setField(1, integer);
-				
-				out.collect(record1);
-			}
+
+			string.setValue((key1 + key2 + 2) + "");
+			integer.setValue(val2 - val1 + 1);
+
+			record1.setField(0, string);
+			record1.setField(1, integer);
+
+			return record1;
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
index a545e05..0f58d18 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
@@ -47,19 +47,19 @@ public class ComputeDistance extends CrossFunction implements Serializable {
 	 * 3: distance
 	 */
 	@Override
-	public void cross(Record dataPointRecord, Record clusterCenterRecord, Collector<Record> out) {
-		
+	public Record cross(Record dataPointRecord, Record clusterCenterRecord) throws Exception {
+
 		CoordVector dataPoint = dataPointRecord.getField(1, CoordVector.class);
-		
+
 		IntValue clusterCenterId = clusterCenterRecord.getField(0, IntValue.class);
 		CoordVector clusterPoint = clusterCenterRecord.getField(1, CoordVector.class);
-	
+
 		this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-		
-		// add cluster center id and distance to the data point record 
+
+		// add cluster center id and distance to the data point record
 		dataPointRecord.setField(2, clusterCenterId);
 		dataPointRecord.setField(3, this.distance);
-		
-		out.collect(dataPointRecord);
+
+		return dataPointRecord;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
index 15640c0..441dc39 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
@@ -89,7 +89,7 @@ public class TPCHQuery3 implements Program, ProgramDescription {
 		/**
 		 * Reads the filter literals from the configuration.
 		 * 
-		 * @see org.apache.flink.api.common.functions.Function#open(org.apache.flink.configuration.Configuration)
+		 * @see org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)
 		 */
 		@Override
 		public void open(Configuration parameters) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 97367f7..7149cd3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -170,7 +170,7 @@ public class KMeansForTest implements Program {
 	// *************************************************************************
 
 	/** Converts a Tuple2<Double,Double> into a Point. */
-	public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
+	public static final class TuplePointConverter extends RichMapFunction<Tuple2<Double, Double>, Point> {
 
 		@Override
 		public Point map(Tuple2<Double, Double> t) throws Exception {
@@ -179,7 +179,7 @@ public class KMeansForTest implements Program {
 	}
 
 	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
-	public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+	public static final class TupleCentroidConverter extends RichMapFunction<Tuple3<Integer, Double, Double>, Centroid> {
 
 		@Override
 		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
@@ -188,7 +188,7 @@ public class KMeansForTest implements Program {
 	}
 
 	/** Determines the closest cluster center for a data point. */
-	public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
+	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
 		private Collection<Centroid> centroids;
 
 		/** Reads the centroid values from a broadcast variable into a collection. */
@@ -236,7 +236,7 @@ public class KMeansForTest implements Program {
 	}
 
 	/** Appends a count variable to the tuple. */
-	public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
+	public static final class CountAppender extends RichMapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
 
 		@Override
 		public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) {
@@ -245,7 +245,7 @@ public class KMeansForTest implements Program {
 	}
 
 	/** Sums and counts point coordinates. */
-	public static final class CentroidAccumulator extends ReduceFunction<DummyTuple3IntPointLong> {
+	public static final class CentroidAccumulator extends RichReduceFunction<DummyTuple3IntPointLong> {
 
 		@Override
 		public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong val2) {
@@ -254,7 +254,7 @@ public class KMeansForTest implements Program {
 	}
 
 	/** Computes new centroid from coordinate sum and count of points. */
-	public static final class CentroidAverager extends MapFunction<DummyTuple3IntPointLong, Centroid> {
+	public static final class CentroidAverager extends RichMapFunction<DummyTuple3IntPointLong, Centroid> {
 
 		@Override
 		public Centroid map(DummyTuple3IntPointLong value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e0f153..8058539 100644
--- a/pom.xml
+++ b/pom.xml
@@ -274,6 +274,9 @@ under the License.
 					<activeByDefault>false</activeByDefault>
 					<jdk>1.8</jdk>
 				</activation>
+                <modules>
+                    <module>flink-java8-tests</module>
+                </modules>
 				<build>
 					<plugins>
 						<plugin>