You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/01 02:36:43 UTC

[02/16] [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index a99ffc8..e57d20c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -24,12 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase.MockCoGroupStub;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -42,7 +39,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class CoGroupTaskTest extends DriverTestBase<GenericCoGrouper<Record, Record, Record>>
+public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Record, Record>>
 {
 	private static final long SORT_MEM = 3*1024*1024;
 	
@@ -403,7 +400,7 @@ public class CoGroupTaskTest extends DriverTestBase<GenericCoGrouper<Record, Rec
 		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
 	}
 	
-	public static class MockFailingCoGroupStub extends CoGroupFunction {
+	public static class MockFailingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
@@ -443,7 +440,7 @@ public class CoGroupTaskTest extends DriverTestBase<GenericCoGrouper<Record, Rec
 	
 	}
 	
-	public static final class MockDelayingCoGroupStub extends CoGroupFunction {
+	public static final class MockDelayingCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
 		private static final long serialVersionUID = 1L;
 		
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 694ae48..885a509 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -24,10 +24,8 @@ import java.util.HashMap;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
 import org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -37,7 +35,7 @@ import org.apache.flink.types.Record;
 import org.junit.Test;
 
 
-public class CombineTaskExternalITCase extends DriverTestBase<GenericGroupReduce<Record, ?>> {
+public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFunction<Record, ?>> {
 	
 	private static final long COMBINE_MEM = 3 * 1024 * 1024;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index f42c443..99440aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -25,12 +25,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -43,7 +41,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class CombineTaskTest extends DriverTestBase<GenericGroupReduce<Record, ?>>
+public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
 {
 	private static final long COMBINE_MEM = 3 * 1024 * 1024;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
index 79d47fe..ced86d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskExternalITCase.java
@@ -21,9 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.runtime.operators.CrossDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.runtime.operators.CrossTaskTest.MockCrossStub;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -31,7 +29,7 @@ import org.apache.flink.types.Record;
 import org.junit.Test;
 
 
-public class CrossTaskExternalITCase extends DriverTestBase<GenericCrosser<Record, Record, Record>>
+public class CrossTaskExternalITCase extends DriverTestBase<CrossFunction<Record, Record, Record>>
 {
 	private static final long CROSS_MEM = 1024 * 1024;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 01015d3..ee51ad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -23,20 +23,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.runtime.operators.CrossDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record, Record>>
+public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>>
 {
 	private static final long CROSS_MEM = 1024 * 1024;
 
@@ -583,26 +579,26 @@ public class CrossTaskTest extends DriverTestBase<GenericCrosser<Record, Record,
 		Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
 	}
 	
-	public static final class MockCrossStub extends CrossFunction {
+	public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
-		public void cross(Record record1, Record record2, Collector<Record> out) {
-			out.collect(record1);
+		public Record cross(Record record1, Record record2) throws Exception {
+			return record1;
 		}
 	}
 	
-	public static final class MockFailingCrossStub extends CrossFunction {
+	public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;
 		
 		@Override
-		public void cross(Record record1, Record record2, Collector<Record> out) {
+		public Record cross(Record record1, Record record2) {
 			if (++this.cnt >= 10) {
 				throw new ExpectedTestException();
 			}
-			out.collect(record1);
+			return record1;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 2115bc8..25998f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -21,12 +21,10 @@ package org.apache.flink.runtime.operators;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.MatchDriver;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
@@ -35,7 +33,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class MatchTaskExternalITCase extends DriverTestBase<GenericJoiner<Record, Record, Record>>
+public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
 {
 	private static final long HASH_MEM = 4*1024*1024;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 6eea546..9b7c582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -23,12 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.MatchDriver;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -42,7 +40,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record, Record>>
+public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
 {
 	private static final long HASH_MEM = 6*1024*1024;
 	
@@ -951,11 +949,10 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		private int cnt = 0;
 		
 		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) {
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
 			if (++this.cnt >= 10) {
 				throw new ExpectedTestException();
 			}
-			
 			out.collect(record1);
 		}
 	}
@@ -964,10 +961,11 @@ public class MatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record,
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) {
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
 			try {
 				Thread.sleep(100);
-			} catch (InterruptedException e) { }
+			} catch (InterruptedException e) {
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index bf51f41..ebee0b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -27,13 +27,11 @@ import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -43,7 +41,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class ReduceTaskExternalITCase extends DriverTestBase<GenericGroupReduce<Record, Record>>
+public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunction<Record, Record>>
 {
 	private static final Log LOG = LogFactory.getLog(ReduceTaskExternalITCase.class);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 0d4e370..b367a7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -28,13 +28,11 @@ import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.record.functions.ReduceFunction;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -48,7 +46,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class ReduceTaskTest extends DriverTestBase<GenericGroupReduce<Record, Record>>
+public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, Record>>
 {
 	private static final Log LOG = LogFactory.getLog(ReduceTaskTest.class);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
index 8c8715e..d603dec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllGroupReduceDriverTest.java
@@ -23,8 +23,8 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.AllGroupReduceDriver;
@@ -46,8 +46,8 @@ public class AllGroupReduceDriverTest {
 	@Test
 	public void testAllReduceDriverImmutableEmpty() {
 		try {
-			TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+			TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -72,8 +72,8 @@ public class AllGroupReduceDriverTest {
 	@Test
 	public void testAllReduceDriverImmutable() {
 		try {
-			TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+			TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -112,8 +112,8 @@ public class AllGroupReduceDriverTest {
 	@Test
 	public void testAllReduceDriverMutable() {
 		try {
-			TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-					new TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+			TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
 			
 			List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 			TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -152,7 +152,7 @@ public class AllGroupReduceDriverTest {
 	//  Test UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class ConcatSumReducer extends GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+	public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
 		@Override
 		public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
@@ -169,7 +169,7 @@ public class AllGroupReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumMutableReducer extends GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
index de01d74..b124e51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/AllReduceDriverTest.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericReduce;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.AllReduceDriver;
@@ -44,8 +44,8 @@ public class AllReduceDriverTest {
 	@Test
 	public void testAllReduceDriverImmutableEmpty() {
 		try {
-			TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+			TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -71,8 +71,8 @@ public class AllReduceDriverTest {
 	public void testAllReduceDriverImmutable() {
 		try {
 			{
-				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+				TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 				TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -103,8 +103,8 @@ public class AllReduceDriverTest {
 			}
 			
 			{
-				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+				TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 				TypeInformation<Tuple2<String, Integer>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -145,8 +145,8 @@ public class AllReduceDriverTest {
 	public void testAllReduceDriverMutable() {
 		try {
 			{
-				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+				TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 				TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -176,8 +176,8 @@ public class AllReduceDriverTest {
 				Assert.assertEquals(78, res.f1.getValue());
 			}
 			{
-				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+				TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 				TypeInformation<Tuple2<StringValue, IntValue>> typeInfo = TypeExtractor.getForObject(data.get(0));
@@ -217,7 +217,7 @@ public class AllReduceDriverTest {
 	//  Test UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class ConcatSumFirstReducer extends ReduceFunction<Tuple2<String, Integer>> {
+	public static final class ConcatSumFirstReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -227,7 +227,7 @@ public class AllReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumSecondReducer extends ReduceFunction<Tuple2<String, Integer>> {
+	public static final class ConcatSumSecondReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 		
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -237,7 +237,7 @@ public class AllReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumFirstMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumFirstMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
@@ -247,7 +247,7 @@ public class AllReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumSecondMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumSecondMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
index e5a01d6..a325e33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/GroupReduceDriverTest.java
@@ -22,9 +22,9 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -46,8 +46,8 @@ public class GroupReduceDriverTest {
 	@Test
 	public void testAllReduceDriverImmutableEmpty() {
 		try {
-			TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+			TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -74,8 +74,8 @@ public class GroupReduceDriverTest {
 	@Test
 	public void testAllReduceDriverImmutable() {
 		try {
-			TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericGroupReduce<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
+			TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>, Tuple2<String,Integer>>();
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -110,8 +110,8 @@ public class GroupReduceDriverTest {
 	@Test
 	public void testAllReduceDriverMutable() {
 		try {
-			TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-					new TestTaskContext<GenericGroupReduce<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+			TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+					new TestTaskContext<GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
 			
 			List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 			TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -149,7 +149,7 @@ public class GroupReduceDriverTest {
 	//  Test UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class ConcatSumReducer extends GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+	public static final class ConcatSumReducer extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
 		@Override
 		public void reduce(Iterator<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
@@ -166,7 +166,7 @@ public class GroupReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumMutableReducer extends GroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumMutableReducer extends RichGroupReduceFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public void reduce(Iterator<Tuple2<StringValue, IntValue>> values, Collector<Tuple2<StringValue, IntValue>> out) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
index 734e119..ae9c294 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceCombineDriverTest.java
@@ -22,9 +22,9 @@ package org.apache.flink.runtime.operators.drivers;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -44,8 +44,8 @@ public class ReduceCombineDriverTest {
 	@Test
 	public void testImmutableEmpty() {
 		try {
-			TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
+			TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
 			context.getTaskConfig().setRelativeMemoryDriver(0.5);
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
@@ -81,8 +81,8 @@ public class ReduceCombineDriverTest {
 	public void testReduceDriverImmutable() {
 		try {
 			{
-				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
+				TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
 				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
@@ -112,8 +112,8 @@ public class ReduceCombineDriverTest {
 			}
 			
 			{
-				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
+				TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>(1024 * 1024);
 				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
@@ -153,8 +153,8 @@ public class ReduceCombineDriverTest {
 	public void testReduceDriverMutable() {
 		try {
 			{
-				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
+				TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
 				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
@@ -181,8 +181,8 @@ public class ReduceCombineDriverTest {
 				DriverTestData.compareTupleArrays(expected, res);
 			}
 			{
-				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
+				TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>(1024 * 1024);
 				context.getTaskConfig().setRelativeMemoryDriver(0.5);
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
@@ -220,7 +220,7 @@ public class ReduceCombineDriverTest {
 	//  Test UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class ConcatSumFirstReducer extends ReduceFunction<Tuple2<String, Integer>> {
+	public static final class ConcatSumFirstReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -230,7 +230,7 @@ public class ReduceCombineDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumSecondReducer extends ReduceFunction<Tuple2<String, Integer>> {
+	public static final class ConcatSumSecondReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 		
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -240,7 +240,7 @@ public class ReduceCombineDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumFirstMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumFirstMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
@@ -250,7 +250,7 @@ public class ReduceCombineDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumSecondMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumSecondMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
index e8370cf..28217b4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/ReduceDriverTest.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.operators.drivers;
 
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -43,8 +43,8 @@ public class ReduceDriverTest {
 	@Test
 	public void testReduceDriverImmutableEmpty() {
 		try {
-			TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-					new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+			TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+					new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
 			
 			List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 			TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -76,8 +76,8 @@ public class ReduceDriverTest {
 	public void testReduceDriverImmutable() {
 		try {
 			{
-				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+				TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 				TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -104,8 +104,8 @@ public class ReduceDriverTest {
 			}
 			
 			{
-				TestTaskContext<GenericReduce<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
-						new TestTaskContext<GenericReduce<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
+				TestTaskContext<ReduceFunction<Tuple2<String, Integer>>, Tuple2<String, Integer>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<String,Integer>>, Tuple2<String,Integer>>();
 				
 				List<Tuple2<String, Integer>> data = DriverTestData.createReduceImmutableData();
 				TupleTypeInfo<Tuple2<String, Integer>> typeInfo = (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(data.get(0));
@@ -142,8 +142,8 @@ public class ReduceDriverTest {
 	public void testReduceDriverMutable() {
 		try {
 			{
-				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+				TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 				TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -169,8 +169,8 @@ public class ReduceDriverTest {
 				DriverTestData.compareTupleArrays(expected, res);
 			}
 			{
-				TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
-						new TestTaskContext<GenericReduce<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
+				TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>> context =
+						new TestTaskContext<ReduceFunction<Tuple2<StringValue, IntValue>>, Tuple2<StringValue, IntValue>>();
 				
 				List<Tuple2<StringValue, IntValue>> data = DriverTestData.createReduceMutableData();
 				TupleTypeInfo<Tuple2<StringValue, IntValue>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, IntValue>>) TypeExtractor.getForObject(data.get(0));
@@ -206,7 +206,7 @@ public class ReduceDriverTest {
 	//  Test UDFs
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class ConcatSumFirstReducer extends ReduceFunction<Tuple2<String, Integer>> {
+	public static final class ConcatSumFirstReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -216,7 +216,7 @@ public class ReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumSecondReducer extends ReduceFunction<Tuple2<String, Integer>> {
+	public static final class ConcatSumSecondReducer extends RichReduceFunction<Tuple2<String, Integer>> {
 		
 		@Override
 		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
@@ -226,7 +226,7 @@ public class ReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumFirstMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumFirstMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {
@@ -236,7 +236,7 @@ public class ReduceDriverTest {
 		}
 	}
 	
-	public static final class ConcatSumSecondMutableReducer extends ReduceFunction<Tuple2<StringValue, IntValue>> {
+	public static final class ConcatSumSecondMutableReducer extends RichReduceFunction<Tuple2<StringValue, IntValue>> {
 
 		@Override
 		public Tuple2<StringValue, IntValue> reduce(Tuple2<StringValue, IntValue> value1, Tuple2<StringValue, IntValue> value2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
index 303d921..ac8cc78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -39,8 +39,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
@@ -414,7 +412,7 @@ public class HashMatchIteratorITCase {
 				collectIntPairData(input1),
 				collectRecordData(input2));
 			
-			final GenericJoiner<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
 			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
 	
 			// reset the generators
@@ -425,7 +423,7 @@ public class HashMatchIteratorITCase {
 			// compare with iterator values
 			BuildSecondHashMatchIterator<IntPair, Record, Record> iterator = 
 					new BuildSecondHashMatchIterator<IntPair, Record, Record>(
-						input1, input2, this.pairSerializer, this.pairComparator, 
+						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
 						this.memoryManager, this.ioManager, this.parentTask, 1.0);
 			
@@ -461,7 +459,7 @@ public class HashMatchIteratorITCase {
 				collectIntPairData(input1),
 				collectRecordData(input2));
 			
-			final GenericJoiner<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
+			final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
 			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
 	
 			// reset the generators
@@ -675,7 +673,7 @@ public class HashMatchIteratorITCase {
 		}
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out)
+		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
 		{
 			TestData.Key key = rec1.getField(0, TestData.Key.class);
 			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
@@ -695,7 +693,7 @@ public class HashMatchIteratorITCase {
 		}
 	}
 	
-	static final class RecordIntPairMatchRemovingMatcher extends AbstractFunction implements GenericJoiner<IntPair, Record, Record>
+	static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
 	{
 		private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
 		
@@ -704,7 +702,7 @@ public class HashMatchIteratorITCase {
 		}
 		
 		@Override
-		public void join(IntPair rec1, Record rec2, Collector<Record> out)
+		public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
 		{
 			final int k = rec1.getKey();
 			final int v = rec1.getValue(); 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index e2de4cb..93e52f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.sort.MergeIterator;
 import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
@@ -353,8 +354,7 @@ public class SortMergeMatchIteratorITCase {
 		}
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out)
-		{
+		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
 			TestData.Key key = rec1.getField(0, TestData.Key.class);
 			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
 			TestData.Value value2 = rec2.getField(1, TestData.Value.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 9c177a6..e6661ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -25,6 +25,8 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
@@ -173,7 +175,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 			
 			// open stub implementation
 			try {
-				this.stub.open(getTaskConfig().getStubParameters());
+				FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters());
 				stubOpen = true;
 			}
 			catch (Throwable t) {
@@ -185,7 +187,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 			
 			// close. We close here such that a regular close throwing an exception marks a task as failed.
 			if (this.running) {
-				this.stub.close();
+				FunctionUtils.closeFunction (this.stub);
 				stubOpen = false;
 			}
 			
@@ -195,7 +197,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 			// close the input, but do not report any exceptions, since we already have another root cause
 			if (stubOpen) {
 				try {
-					this.stub.close();
+					FunctionUtils.closeFunction(this.stub);
 				}
 				catch (Throwable t) {}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index faa87b5..779640d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
 
 import java.util.List;
 
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.record.io.DelimitedInputFormat;
@@ -80,10 +80,10 @@ public abstract class TaskTestBase {
 		return this.mockEnv.getTaskConfiguration();
 	}
 
-	public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends Function> stubClass) {
+	public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends RichFunction> stubClass) {
 		final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		config.setDriver(driver);
-		config.setStubWrapper(new UserCodeClassWrapper<Function>(stubClass));
+		config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));
 		
 		task.setEnvironment(this.mockEnv);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 92b6c44..8b891a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.hash.BuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.sort.MergeMatchIterator;
@@ -250,6 +251,7 @@ public class HashVsSortMiniBenchmark {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) {}
+		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
index 6dfd1e3..c292100 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/functions/CrossFunction.scala
@@ -45,7 +45,7 @@ abstract class CrossFunctionBase[LeftIn: UDT, RightIn: UDT, Out: UDT] extends JC
 }
 
 abstract class CrossFunction[LeftIn: UDT, RightIn: UDT, Out: UDT] extends CrossFunctionBase[LeftIn, RightIn, Out] with Function2[LeftIn, RightIn, Out] {
-  override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
+  override def cross(leftRecord: Record, rightRecord: Record) : Record = {
     val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
     val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
     val output = apply(left, right)
@@ -59,31 +59,7 @@ abstract class CrossFunction[LeftIn: UDT, RightIn: UDT, Out: UDT] extends CrossF
     leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
 
     serializer.serialize(output, leftRecord)
-    out.collect(leftRecord)
-  }
-}
-
-abstract class FlatCrossFunction[LeftIn: UDT, RightIn: UDT, Out: UDT] extends CrossFunctionBase[LeftIn, RightIn, Out] with Function2[LeftIn, RightIn, Iterator[Out]]  {
-  override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
-    val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
-    val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
-    val output = apply(left, right)
-
-    if (output.nonEmpty) {
-
-      leftRecord.setNumFields(outputLength)
-
-      for (field <- leftDiscard)
-        leftRecord.setNull(field)
-
-      leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
-      leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
-      for (item <- output) {
-        serializer.serialize(item, leftRecord)
-        out.collect(leftRecord)
-      }
-    }
+    leftRecord
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
index 17df2c3..aa04093 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/CrossOperator.scala
@@ -27,7 +27,7 @@ import java.util.{ Iterator => JIterator }
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.functions.{CrossFunctionBase, CrossFunction, FlatCrossFunction}
+import org.apache.flink.api.scala.functions.{CrossFunctionBase, CrossFunction}//, FlatCrossFunction}
 import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
 import org.apache.flink.api.scala.functions.DeserializingIterator
 import org.apache.flink.api.scala.DataSet
@@ -42,7 +42,7 @@ import org.apache.flink.configuration.Configuration
 
 class CrossDataSet[LeftIn, RightIn](val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
   def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CrossMacros.map[LeftIn, RightIn, Out]
-  def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CrossMacros.flatMap[LeftIn, RightIn, Out]
+  //def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro CrossMacros.flatMap[LeftIn, RightIn, Out]
   def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)] = macro CrossMacros.filter[LeftIn, RightIn]
 }
 
@@ -65,7 +65,7 @@ object CrossMacros {
       implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
       implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
       new CrossFunctionBase[LeftIn, RightIn, Out] {
-        override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
+        override def cross(leftRecord: Record, rightRecord: Record) : Record = {
           val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
           val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
           val output = fun.splice.apply(left, right)
@@ -79,7 +79,7 @@ object CrossMacros {
           leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
 
           serializer.serialize(output, leftRecord)
-          out.collect(leftRecord)
+          leftRecord
         }
       }
     }
@@ -106,67 +106,6 @@ object CrossMacros {
     return result
   }
   
-  def flatMap[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = CrossDataSet[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Iterator[Out]]): c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
-    val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
-    val stub: c.Expr[CrossFunctionBase[LeftIn, RightIn, Out]] = if (fun.actualType <:< weakTypeOf[CrossFunction[LeftIn, RightIn, Out]])
-      reify { fun.splice.asInstanceOf[CrossFunctionBase[LeftIn, RightIn, Out]] }
-    else reify {
-      implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
-      implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
-      implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-      new CrossFunctionBase[LeftIn, RightIn, Out] {
-        override def cross(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
-          val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
-          val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
-          val output = fun.splice.apply(left, right)
-
-          if (output.nonEmpty) {
-
-            leftRecord.setNumFields(outputLength)
-
-            for (field <- leftDiscard)
-              leftRecord.setNull(field)
-
-            leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
-            leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
-            for (item <- output) {
-              serializer.serialize(item, leftRecord)
-              out.collect(leftRecord)
-            }
-          }
-        }
-      }
-    }
-    val contract = reify {
-      val helper: CrossDataSet[LeftIn, RightIn] = c.prefix.splice
-      val leftInput = helper.leftInput.contract
-      val rightInput = helper.rightInput.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val builder = CrossOperator.builder(generatedStub).input1(leftInput).input2(rightInput)
-      
-      val ret = new CrossOperator(builder) with TwoInputScalaOperator[LeftIn, RightIn, Out] {
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFieldsFirst(
-            Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
-          Annotations.getConstantFieldsSecond(
-            Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
-      }
-      new DataSet[Out](ret) with TwoInputHintable[LeftIn, RightIn, Out] {}
-    }
-
-    val result = c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-    
-    return result
-  }
 
   def filter[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag](c: Context { type PrefixType = CrossDataSet[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Boolean]): c.Expr[DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)]] = {
     import c.universe._

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
index 759b444..66d94ba 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/IterateOperators.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.scala.analysis.UDF0
 import org.apache.flink.api.scala.analysis.FieldSelector
 
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.functions.AbstractFunction
+import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.java.record.operators.BulkIteration
 import org.apache.flink.api.common.operators.base.BulkIterationBase
 import org.apache.flink.api.java.record.operators.DeltaIteration

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index f450483..ccdd52e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
@@ -166,7 +166,7 @@ public abstract class CompilerTestBase {
 		}
 		
 		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
+		public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) {
 			List<PlanNode> nodes = this.map.get(name);
 			if (nodes == null || nodes.isEmpty()) {
 				throw new RuntimeException("No node found with the given name and stub class.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
index de27390..28bdd01 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.DeltaIteration;
@@ -63,7 +63,7 @@ public class OperatorResolver implements Visitor<Operator<?>> {
 	}
 	
 	@SuppressWarnings("unchecked")
-	public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
+	public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) {
 		List<Operator<?>> nodes = this.map.get(name);
 		if (nodes == null || nodes.isEmpty()) {
 			throw new RuntimeException("No node found with the given name and stub class.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index e6ce7c1..950e78e 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -35,6 +35,7 @@ under the License.
 
 	<packaging>jar</packaging>
 
+
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -184,7 +185,6 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
-
 		</plugins>
 		
 		<pluginManagement>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index 15cfac3..e0a96b3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -106,7 +106,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
 		return result;
 	}
 	
-	public static final class SummingJoin extends JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+	public static final class SummingJoin extends RichJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		@Override
 		public Tuple2<Long, Double> join(Tuple2<Long, Double> first, Tuple2<Long, Double> second) {
@@ -114,7 +114,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
 		}
 	}
 	
-	public static final class SummingJoinProject extends JoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+	public static final class SummingJoinProject extends RichJoinFunction<Tuple3<Long, Double, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		@Override
 		public Tuple2<Long, Double> join(Tuple3<Long, Double, Double> first, Tuple2<Long, Double> second) {
@@ -122,7 +122,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
 		}
 	}
 	
-	public static final class Duplicator extends FlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
+	public static final class Duplicator extends RichFlatMapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		@Override
 		public void flatMap(Tuple2<Long, Double> value, Collector<Tuple2<Long, Double>> out) {
@@ -131,7 +131,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
 		}
 	}
 	
-	public static final class Expander extends MapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
+	public static final class Expander extends RichMapFunction<Tuple2<Long, Double>, Tuple3<Long, Double, Double>> {
 
 		@Override
 		public Tuple3<Long, Double, Double> map(Tuple2<Long, Double> value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index 857f321..35aea4b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
@@ -61,7 +61,7 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
 	}
 
 	
-	public static class PickOneAllReduce extends GroupReduceFunction<Integer, Integer> {
+	public static class PickOneAllReduce extends RichGroupReduceFunction<Integer, Integer> {
 		
 		private Integer bcValue;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 31540dc..49bdb26 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -23,9 +23,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -90,7 +90,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 	//  The test program
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class VertexParser extends MapFunction<String, Long> {
+	public static final class VertexParser extends RichMapFunction<String, Long> {
 
 		@Override
 		public Long map(String value) throws Exception {
@@ -98,7 +98,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 		}
 	}
 	
-	public static final class EdgeParser extends FlatMapFunction<String, Tuple2<Long, Long>> {
+	public static final class EdgeParser extends RichFlatMapFunction<String, Tuple2<Long, Long>> {
 
 		@Override
 		public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception {
@@ -113,7 +113,7 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase
 
 	@ConstantFieldsFirst("0")
 	@ConstantFieldsSecond("0")
-	public static final class MinIdAndUpdate extends CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		
 		@Override
 		public void coGroup(Iterator<Tuple2<Long, Long>> candidates, Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
index be24662..d2669aa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DependencyConnectedComponentsITCase.java
@@ -23,9 +23,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -153,8 +153,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class FindCandidatesJoin extends JoinFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
+	public static final class FindCandidatesJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long> {
 		
 		private static final long serialVersionUID = 1L;
 		
@@ -166,7 +165,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class RemoveDuplicatesReduce extends GroupReduceFunction<Long, Long> {
+	public static final class RemoveDuplicatesReduce extends RichGroupReduceFunction<Long, Long> {
 		
 		private static final long serialVersionUID = 1L;
 
@@ -176,8 +175,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class FindCandidatesDependenciesJoin extends JoinFunction
-		<Long, Tuple2<Long, Long>,Tuple2<Long, Long>> {
+	public static final class FindCandidatesDependenciesJoin extends RichJoinFunction<Long, Tuple2<Long, Long>,Tuple2<Long, Long>> {
 	
 		private static final long serialVersionUID = 1L;
 		
@@ -187,8 +185,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class NeighborWithComponentIDJoin extends JoinFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class NeighborWithComponentIDJoin extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 	
 		private static final long serialVersionUID = 1L;
 		
@@ -201,8 +198,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 	
-	public static final class MinimumReduce extends GroupReduceFunction
-		<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+	public static final class MinimumReduce extends RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 		
 		private static final long serialVersionUID = 1L;
 		final Tuple2<Long, Long> resultVertex = new Tuple2<Long, Long>();
@@ -228,8 +224,7 @@ public class DependencyConnectedComponentsITCase extends JavaProgramTestBase {
 		}
 	}
 
-	public static final class MinimumIdFilter extends FlatMapFunction
-		<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
+	public static final class MinimumIdFilter extends RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, Long>> {
 	
 		private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index d719cc5..9ea33d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -28,8 +28,8 @@ import org.junit.Assert;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -287,7 +287,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class SubtractOneMap extends MapFunction<Integer, Integer> {
+	public static final class SubtractOneMap extends RichMapFunction<Integer, Integer> {
 
 		private LongSumAggregator aggr;
 
@@ -309,7 +309,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class SubtractOneMapWithParam extends MapFunction<Integer, Integer> {
+	public static final class SubtractOneMapWithParam extends RichMapFunction<Integer, Integer> {
 
 		private LongSumAggregatorWithParameter aggr;
 
@@ -345,7 +345,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class TupleMakerMap extends MapFunction<Integer, Tuple2<Integer, Integer>> {
+	public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
 
 		@Override
 		public Tuple2<Integer, Integer> map(Integer value) throws Exception {
@@ -357,7 +357,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AggregateMapDelta extends MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+	public static final class AggregateMapDelta extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
 
 		private LongSumAggregator aggr;
 		private LongValue previousAggr;
@@ -388,8 +388,8 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFilter extends FlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, 
-		Tuple2<Integer, Integer>> {
+	public static final class UpdateFilter extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
+			Tuple2<Integer, Integer>> {
 
 		private int superstep;
 
@@ -411,7 +411,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class ProjectSecondMapper extends MapFunction<Tuple2<Integer, Integer>, Integer> {
+	public static final class ProjectSecondMapper extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
 
 		@Override
 		public Integer map(Tuple2<Integer, Integer> value) {
@@ -420,7 +420,7 @@ public class AggregatorsITCase extends JavaProgramTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AggregateMapDeltaWithParam extends MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+	public static final class AggregateMapDeltaWithParam extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
 
 		private LongSumAggregatorWithParameter aggr;
 		private LongValue previousAggr;