You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/09 14:39:48 UTC

[17/39] [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index da0f222..0727d63 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper;
@@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction.
  * 
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<IT, OT>, OT> {
+public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
 	
 	private static final Log LOG = LogFactory.getLog(AllGroupReduceDriver.class);
 
-	private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext;
+	private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
 	
 	private MutableObjectIterator<IT> input;
 
@@ -54,7 +54,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> context) {
+	public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
 		this.taskContext = context;
 	}
 	
@@ -64,9 +64,9 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
 	}
 
 	@Override
-	public Class<GenericGroupReduce<IT, OT>> getStubType() {
+	public Class<GroupReduceFunction<IT, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericGroupReduce<IT, OT>> clazz = (Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class;
+		final Class<GroupReduceFunction<IT, OT>> clazz = (Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class;
 		return clazz;
 	}
 
@@ -83,8 +83,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
 		this.strategy = config.getDriverStrategy();
 		
 		if (strategy == DriverStrategy.ALL_GROUP_COMBINE) {
-			if (!(this.taskContext.getStub() instanceof GenericCombine)) {
-				throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GenericCombine.class.getName());
+			if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) {
+				throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName());
 			}
 		}
 		else if (strategy != DriverStrategy.ALL_GROUP_REDUCE) {
@@ -105,13 +105,13 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu
 		// single UDF call with the single group
 		if (inIter.hasNext()) {
 			if (strategy == DriverStrategy.ALL_GROUP_REDUCE) {
-				final GenericGroupReduce<IT, OT> reducer = this.taskContext.getStub();
+				final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub();
 				final Collector<OT> output = this.taskContext.getOutputCollector();
 				reducer.reduce(inIter, output);
 			}
 			else {
 				@SuppressWarnings("unchecked")
-				final GenericCombine<IT> combiner = (GenericCombine<IT>) this.taskContext.getStub();
+				final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub();
 				@SuppressWarnings("unchecked")
 				final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector();
 				combiner.combine(inIter, output);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 30bfae3..721f4f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -35,13 +35,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The ReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the ReduceFunction.
  * 
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 	
 	private static final Log LOG = LogFactory.getLog(AllReduceDriver.class);
 
-	private PactTaskContext<GenericReduce<T>, T> taskContext;
+	private PactTaskContext<ReduceFunction<T>, T> taskContext;
 	
 	private MutableObjectIterator<T> input;
 
@@ -52,7 +52,7 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+	public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -63,9 +63,9 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	}
 
 	@Override
-	public Class<GenericReduce<T>> getStubType() {
+	public Class<ReduceFunction<T>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class;
+		final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
 		return clazz;
 	}
 
@@ -94,7 +94,7 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
 			LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code."));
 		}
 
-		final GenericReduce<T> stub = this.taskContext.getStub();
+		final ReduceFunction<T> stub = this.taskContext.getStub();
 		final MutableObjectIterator<T> input = this.input;
 		final TypeSerializer<T> serializer = this.serializer;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 3da451a..8ff0262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,12 +41,12 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.java.record.functions.CoGroupFunction
  */
-public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
 	
 	private static final Log LOG = LogFactory.getLog(CoGroupDriver.class);
 	
 	
-	private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+	private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	private CoGroupTaskIterator<IT1, IT2> coGroupIterator;				// the iterator that does the actual cogroup
 	
@@ -56,7 +56,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<
 
 
 	@Override
-	public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) {
+	public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -69,9 +69,9 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<
 	
 
 	@Override
-	public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+	public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+		final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
 		return clazz;
 	}
 	
@@ -122,7 +122,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<
 	@Override
 	public void run() throws Exception
 	{
-		final GenericCoGrouper<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
+		final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 9d06618..8761a2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
 
-public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
 	
-	private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+	private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	private CompactingHashTable<IT1> hashTable;
 	
@@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) {
+	public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 	}
 	
 	@Override
-	public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+	public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+		final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
 		return clazz;
 	}
 	
@@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 	@Override
 	public void run() throws Exception {
 
-		final GenericCoGrouper<IT1, IT2, OT> coGroupStub = taskContext.getStub();
+		final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
 		
 		IT1 buildSideRecord = solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 80fa855..f2020c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
 
-public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
 	
-	private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+	private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	private CompactingHashTable<IT2> hashTable;
 	
@@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) {
+	public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 	}
 	
 	@Override
-	public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+	public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+		final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
 		return clazz;
 	}
 	
@@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 	@Override
 	public void run() throws Exception {
 
-		final GenericCoGrouper<IT1, IT2, OT> coGroupStub = taskContext.getStub();
+		final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
 		
 		IT2 buildSideRecord = solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 7c311ed..b68f24d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator;
@@ -40,12 +40,12 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.java.functions.CrossFunction
  */
-public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2, OT>, OT> {
+public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, OT>, OT> {
 	
 	private static final Log LOG = LogFactory.getLog(CrossDriver.class);
 	
 	
-	private PactTaskContext<GenericCrosser<T1, T2, OT>, OT> taskContext;
+	private PactTaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
 	
 	private MemoryManager memManager;
 	
@@ -67,7 +67,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 
 
 	@Override
-	public void setup(PactTaskContext<GenericCrosser<T1, T2, OT>, OT> context) {
+	public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -80,9 +80,9 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 
 
 	@Override
-	public Class<GenericCrosser<T1, T2, OT>> getStubType() {
+	public Class<CrossFunction<T1, T2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericCrosser<T1, T2, OT>> clazz = (Class<GenericCrosser<T1, T2, OT>>) (Class<?>) GenericCrosser.class;
+		final Class<CrossFunction<T1, T2, OT>> clazz = (Class<CrossFunction<T1, T2, OT>>) (Class<?>) CrossFunction.class;
 		return clazz;
 	}
 	
@@ -207,7 +207,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 		final T2 val2Reuse = serializer2.createInstance();
 		T2 val2Copy = serializer2.createInstance();
 		
-		final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		// for all blocks
@@ -217,7 +217,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 				// for all values in the block
 				while ((val1 = blockVals.next(val1Reuse)) != null) {
 					val2Copy = serializer2.copy(val2, val2Copy);
-					crosser.cross(val1, val2Copy, collector);
+					collector.collect(crosser.cross(val1,val2Copy));
+					//crosser.cross(val1, val2Copy, collector);
 				}
 				blockVals.reset();
 			}
@@ -254,7 +255,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 		T2 val2;
 		final T2 val2Reuse = serializer2.createInstance();
 
-		final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		// for all blocks
@@ -264,7 +265,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 				// for all values in the block
 				while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) {
 					val1Copy = serializer1.copy(val1, val1Copy);
-					crosser.cross(val1Copy, val2, collector);
+					collector.collect(crosser.cross(val1Copy, val2));
+					//crosser.cross(val1Copy, val2, collector);
 				}
 				blockVals.reset();
 			}
@@ -296,7 +298,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 		T2 val2;
 		final T2 val2Reuse = serializer2.createInstance();
 
-		final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		// for all blocks
@@ -304,7 +306,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 			// for all values from the spilling side
 			while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) {
 				val1Copy = serializer1.copy(val1, val1Copy);
-				crosser.cross(val1Copy, val2, collector);
+				collector.collect(crosser.cross(val1Copy, val2));
+				//crosser.cross(val1Copy, val2, collector);
 			}
 			spillVals.reset();
 		}
@@ -332,7 +335,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 		final T2 val2Reuse = serializer2.createInstance();
 		T2 val2Copy = serializer2.createInstance();
 		
-		final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub();
+		final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		// for all blocks
@@ -340,7 +343,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2
 			// for all values from the spilling side
 			while (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
 				val2Copy = serializer2.copy(val2, val2Copy);
-				crosser.cross(val1, val2Copy, collector);
+				collector.collect(crosser.cross(val1, val2Copy));
+				//crosser.cross(val1, val2Copy, collector);
 			}
 			spillVals.reset();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index 44f22a0..130601b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator;
  * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method
  * of the MapFunction.
  * 
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
  * 
  * @param <IT> The mapper's input data type.
  * @param <OT> The mapper's output data type.
  */
-public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>, OT> {
+public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>, OT> {
 	
-	private PactTaskContext<GenericFlatMap<IT, OT>, OT> taskContext;
+	private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext;
 	
 	private volatile boolean running;
 	
 	
 	@Override
-	public void setup(PactTaskContext<GenericFlatMap<IT, OT>, OT> context) {
+	public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -55,9 +55,9 @@ public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>,
 	}
 
 	@Override
-	public Class<GenericFlatMap<IT, OT>> getStubType() {
+	public Class<FlatMapFunction<IT, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericFlatMap<IT, OT>> clazz = (Class<GenericFlatMap<IT, OT>>) (Class<?>) GenericFlatMap.class;
+		final Class<FlatMapFunction<IT, OT>> clazz = (Class<FlatMapFunction<IT, OT>>) (Class<?>) FlatMapFunction.class;
 		return clazz;
 	}
 
@@ -75,7 +75,7 @@ public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>,
 	public void run() throws Exception {
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
-		final GenericFlatMap<IT, OT> function = this.taskContext.getStub();
+		final FlatMapFunction<IT, OT> function = this.taskContext.getStub();
 		final Collector<OT> output = this.taskContext.getOutputCollector();
 
 		IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 1d0749c..f786c56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -39,12 +39,12 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type consumed and produced by the combiner.
  */
-public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>, T> {
+public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFunction<T>, T> {
 	
 	private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
 
 	
-	private PactTaskContext<GenericCombine<T>, T> taskContext;
+	private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
 	
 	private CloseableInputProvider<T> input;
 
@@ -57,7 +57,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericCombine<T>, T> context) {
+	public void setup(PactTaskContext<FlatCombineFunction<T>, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -68,9 +68,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
 	}
 
 	@Override
-	public Class<GenericCombine<T>> getStubType() {
+	public Class<FlatCombineFunction<T>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericCombine<T>> clazz = (Class<GenericCombine<T>>) (Class<?>) GenericCombine.class;
+		final Class<FlatCombineFunction<T>> clazz = (Class<FlatCombineFunction<T>>) (Class<?>) FlatCombineFunction.class;
 		return clazz;
 	}
 
@@ -111,7 +111,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>
 				this.serializerFactory.getSerializer(), this.comparator);
 
 		// cache references on the stack
-		final GenericCombine<T> stub = this.taskContext.getStub();
+		final FlatCombineFunction<T> stub = this.taskContext.getStub();
 		final Collector<T> output = this.taskContext.getOutputCollector();
 
 		// run stub implementation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 1ab080c..960143d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction.
  * 
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<IT, OT>, OT> {
+public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
 	
 	private static final Log LOG = LogFactory.getLog(GroupReduceDriver.class);
 
-	private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext;
+	private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
 	
 	private MutableObjectIterator<IT> input;
 
@@ -56,7 +56,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> context) {
+	public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -67,9 +67,9 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<
 	}
 
 	@Override
-	public Class<GenericGroupReduce<IT, OT>> getStubType() {
+	public Class<GroupReduceFunction<IT, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericGroupReduce<IT, OT>> clazz = (Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class;
+		final Class<GroupReduceFunction<IT, OT>> clazz = (Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class;
 		return clazz;
 	}
 
@@ -100,7 +100,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<
 		final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
 
 		// cache references on the stack
-		final GenericGroupReduce<IT, OT> stub = this.taskContext.getStub();
+		final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
 		final Collector<OT> output = this.taskContext.getOutputCollector();
 
 		// run stub implementation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index b23b0cd..342f307 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
 	
-	private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+	private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	private CompactingHashTable<IT1> hashTable;
 	
@@ -50,7 +50,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) {
+	public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -61,9 +61,9 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 	}
 	
 	@Override
-	public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+	public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+		final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
 		return clazz;
 	}
 	
@@ -126,7 +126,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 	@Override
 	public void run() throws Exception {
 
-		final GenericJoiner<IT1, IT2, OT> joinFunction = taskContext.getStub();
+		final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
 		
 		IT1 buildSideRecord = this.solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 4fa5c5a..c38a81a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
 	
-	private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+	private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	private CompactingHashTable<IT2> hashTable;
 	
@@ -50,7 +50,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) {
+	public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -61,9 +61,9 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 	}
 	
 	@Override
-	public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+	public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+		final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
 		return clazz;
 	}
 	
@@ -126,7 +126,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 	@Override
 	public void run() throws Exception {
 
-		final GenericJoiner<IT1, IT2, OT> joinFunction = taskContext.getStub();
+		final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub();
 		final Collector<OT> collector = taskContext.getOutputCollector();
 		
 		IT2 buildSideRecord = this.solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index fe1e0c1..89fbf4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator;
  * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method
  * of the MapFunction.
  * 
- * @see GenericMap
+ * @see org.apache.flink.api.common.functions.MapFunction
  * 
  * @param <IT> The mapper's input data type.
  * @param <OT> The mapper's output data type.
  */
-public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
+public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
 	
-	private PactTaskContext<GenericMap<IT, OT>, OT> taskContext;
+	private PactTaskContext<MapFunction<IT, OT>, OT> taskContext;
 	
 	private volatile boolean running;
 	
 	
 	@Override
-	public void setup(PactTaskContext<GenericMap<IT, OT>, OT> context) {
+	public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -55,9 +55,9 @@ public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
 	}
 
 	@Override
-	public Class<GenericMap<IT, OT>> getStubType() {
+	public Class<MapFunction<IT, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericMap<IT, OT>> clazz = (Class<GenericMap<IT, OT>>) (Class<?>) GenericMap.class;
+		final Class<MapFunction<IT, OT>> clazz = (Class<MapFunction<IT, OT>>) (Class<?>) MapFunction.class;
 		return clazz;
 	}
 
@@ -75,7 +75,7 @@ public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
 	public void run() throws Exception {
 		// cache references on the stack
 		final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
-		final GenericMap<IT, OT> function = this.taskContext.getStub();
+		final MapFunction<IT, OT> function = this.taskContext.getStub();
 		final Collector<OT> output = this.taskContext.getOutputCollector();
 
 		IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index a29afa2..e205f1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,13 +42,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The MatchTask matches all pairs of records that share the same key and come from different inputs. Each pair of 
  * matching records is handed to the <code>match()</code> method of the JoinFunction.
  * 
- * @see GenericJoiner
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
 	
 	protected static final Log LOG = LogFactory.getLog(MatchDriver.class);
 	
-	protected PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+	protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;		// the iterator that does the actual matching
 	
@@ -57,7 +57,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) {
+	public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -68,9 +68,9 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
 	}
 
 	@Override
-	public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+	public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+		final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
 		return clazz;
 	}
 	
@@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1,
 
 	@Override
 	public void run() throws Exception {
-		final GenericJoiner<IT1, IT2, OT> matchStub = this.taskContext.getStub();
+		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		final JoinTaskIterator<IT1, IT2, OT> matchIterator = this.matchIterator;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index ffe27e6..33d8a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -28,15 +28,15 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type.
  */
-public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> {
+public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
 	
-	private PactTaskContext<AbstractFunction, T> taskContext;
+	private PactTaskContext<AbstractRichFunction, T> taskContext;
 	
 	private volatile boolean running;
 	
 	
 	@Override
-	public void setup(PactTaskContext<AbstractFunction, T> context) {
+	public void setup(PactTaskContext<AbstractRichFunction, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -47,7 +47,7 @@ public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> {
 	}
 	
 	@Override
-	public Class<AbstractFunction> getStubType() {
+	public Class<AbstractRichFunction> getStubType() {
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 4d72085..87cea30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -44,7 +44,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type consumed and produced by the combiner.
  */
-public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 	
 	private static final Log LOG = LogFactory.getLog(ReduceCombineDriver.class);
 
@@ -52,13 +52,13 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 	
 	
-	private PactTaskContext<GenericReduce<T>, T> taskContext;
+	private PactTaskContext<ReduceFunction<T>, T> taskContext;
 
 	private TypeSerializer<T> serializer;
 
 	private TypeComparator<T> comparator;
 	
-	private GenericReduce<T> reducer;
+	private ReduceFunction<T> reducer;
 	
 	private Collector<T> output;
 	
@@ -75,7 +75,7 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+	public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -86,9 +86,9 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	}
 
 	@Override
-	public Class<GenericReduce<T>> getStubType() {
+	public Class<ReduceFunction<T>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class;
+		final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
 		return clazz;
 	}
 
@@ -168,7 +168,7 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> {
 			final TypeSerializer<T> serializer = this.serializer;
 			final TypeComparator<T> comparator = this.comparator;
 			
-			final GenericReduce<T> function = this.reducer;
+			final ReduceFunction<T> function = this.reducer;
 			
 			final Collector<T> output = this.output;
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index a7e9305..9495cdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -36,13 +36,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The ReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the ReduceFunction.
  * 
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 	
 	private static final Log LOG = LogFactory.getLog(ReduceDriver.class);
 
-	private PactTaskContext<GenericReduce<T>, T> taskContext;
+	private PactTaskContext<ReduceFunction<T>, T> taskContext;
 	
 	private MutableObjectIterator<T> input;
 
@@ -55,7 +55,7 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+	public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}
@@ -66,9 +66,9 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
 	}
 
 	@Override
-	public Class<GenericReduce<T>> getStubType() {
+	public Class<ReduceFunction<T>> getStubType() {
 		@SuppressWarnings("unchecked")
-		final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class;
+		final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
 		return clazz;
 	}
 
@@ -101,7 +101,7 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
 		final TypeSerializer<T> serializer = this.serializer;
 		final TypeComparator<T> comparator = this.comparator;
 		
-		final GenericReduce<T> function = this.taskContext.getStub();
+		final ReduceFunction<T> function = this.taskContext.getStub();
 		
 		final Collector<T> output = this.taskContext.getOutputCollector();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 45fe05a..c8f217c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -24,8 +24,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -497,7 +498,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			if (this.stub != null) {
 				try {
 					Configuration stubConfig = this.config.getStubParameters();
-					this.stub.open(stubConfig);
+					FunctionUtils.openFunction(this.stub, stubConfig);
 					stubOpen = true;
 				}
 				catch (Throwable t) {
@@ -510,7 +511,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 			// close. We close here such that a regular close throwing an exception marks a task as failed.
 			if (this.running && this.stub != null) {
-				this.stub.close();
+				FunctionUtils.closeFunction(this.stub);
 				stubOpen = false;
 			}
 
@@ -525,15 +526,17 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			// modify accumulators.ll;
 			if (this.stub != null) {
 				// collect the counters from the stub
-				Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators();
-				RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
+				if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
+					Map<String, Accumulator<?, ?>> accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
+					RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
+				}
 			}
 		}
 		catch (Exception ex) {
 			// close the input, but do not report any exceptions, since we already have another root cause
 			if (stubOpen) {
 				try {
-					this.stub.close();
+					FunctionUtils.closeFunction(this.stub);
 				}
 				catch (Throwable t) {}
 			}
@@ -582,9 +585,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// tasks. Type conflicts can occur here if counters with same name but
 		// different type were used.
 
+
 		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			Map<String, Accumulator<?, ?>> chainedAccumulators = chainedTask.getStub().getRuntimeContext().getAllAccumulators();
-			AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
+				Map<String, Accumulator<?, ?>> chainedAccumulators = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
+				AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+			}
 		}
 
 		// Don't report if the UDF didn't collect any accumulators
@@ -607,7 +613,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// done before sending
 		AccumulatorHelper.resetAndClearAccumulators(accumulators);
 		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			AccumulatorHelper.resetAndClearAccumulators(chainedTask.getStub().getRuntimeContext().getAllAccumulators());
+			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
+				AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators());
+			}
 		}
 	}
 
@@ -693,7 +701,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + 
 						stubSuperClass.getName() + "' as is required.");
 			}
-			stub.setRuntimeContext(this.runtimeUdfContext);
+			FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
 			return stub;
 		}
 		catch (ClassCastException ccex) {
@@ -988,13 +996,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 						e.getMessage() == null ? "." : ": " + e.getMessage(), e);
 				}
 				
-				if (!(localStub instanceof GenericCombine)) {
+				if (!(localStub instanceof FlatCombineFunction)) {
 					throw new IllegalStateException("Performing combining sort outside a reduce task!");
 				}
 
 				@SuppressWarnings({ "rawtypes", "unchecked" })
 				CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
-					(GenericCombine) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], 
+					(FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
 					this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
 					this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
 					this.config.getSpillingThresholdInput(inputNum));
@@ -1375,7 +1383,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Opens the given stub using its {@link Function#open(Configuration)} method. If the open call produces
+	 * Opens the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. If the open call produces
 	 * an exception, a new exception with a standard error message is created, using the encountered exception
 	 * as its cause.
 	 * 
@@ -1386,14 +1394,14 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	public static void openUserCode(Function stub, Configuration parameters) throws Exception {
 		try {
-			stub.open(parameters);
+			FunctionUtils.openFunction(stub, parameters);
 		} catch (Throwable t) {
 			throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t);
 		}
 	}
 	
 	/**
-	 * Closes the given stub using its {@link Function#close()} method. If the close call produces
+	 * Closes the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#close()} method. If the close call produces
 	 * an exception, a new exception with a standard error message is created, using the encountered exception
 	 * as its cause.
 	 * 
@@ -1403,7 +1411,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	public static void closeUserCode(Function stub) throws Exception {
 		try {
-			stub.close();
+			FunctionUtils.closeFunction(stub);
 		} catch (Throwable t) {
 			throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t);
 		}
@@ -1505,4 +1513,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 		return a;
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
index 3f7ad61..3dbab78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators.chaining;
 
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -61,7 +61,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
 	// --------------------------------------------------------------------------------------------
 
-	public Function getStub() {
+	public RichFunction getStub() {
 		return this.mapper;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index cca6838..db134a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -19,25 +19,26 @@
 
 package org.apache.flink.runtime.operators.chaining;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
 
 public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
-	private GenericFlatMap<IT, OT> mapper;
+	private FlatMapFunction<IT, OT> mapper;
 
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void setup(AbstractInvokable parent) {
 		@SuppressWarnings("unchecked")
-		final GenericFlatMap<IT, OT> mapper =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericFlatMap.class);
+		final FlatMapFunction<IT, OT> mapper =
+			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class);
 		this.mapper = mapper;
-		mapper.setRuntimeContext(getUdfRuntimeContext());
+		FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext());
 	}
 
 	@Override
@@ -54,8 +55,9 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void cancelTask() {
 		try {
-			this.mapper.close();
-		} catch (Throwable t) {
+			FunctionUtils.closeFunction(this.mapper);
+		}
+		catch (Throwable t) {
 		}
 	}
 
@@ -84,4 +86,5 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	public void close() {
 		this.outputCollector.close();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index 3ae324c..a7b1048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -20,24 +20,25 @@
 package org.apache.flink.runtime.operators.chaining;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
 
 public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
-	private GenericMap<IT, OT> mapper;
+	private MapFunction<IT, OT> mapper;
 
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public void setup(AbstractInvokable parent) {
 		@SuppressWarnings("unchecked")
-		final GenericMap<IT, OT> mapper =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericMap.class);
+		final MapFunction<IT, OT> mapper =
+			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class);
 		this.mapper = mapper;
-		mapper.setRuntimeContext(getUdfRuntimeContext());
+		FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext());
 	}
 
 	@Override
@@ -54,7 +55,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	@Override
 	public void cancelTask() {
 		try {
-			this.mapper.close();
+			FunctionUtils.closeFunction(this.mapper);
 		} catch (Throwable t) {
 		}
 	}
@@ -84,4 +85,5 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 	public void close() {
 		this.outputCollector.close();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
index 76d860b..3a42469 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators.chaining;
 
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.BulkIterationBase.TerminationCriterionAggregator;
@@ -47,7 +47,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT,
 
 	// --------------------------------------------------------------------------------------------
 
-	public Function getStub() {
+	public RichFunction getStub() {
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index d5ce0a7..ffac151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -22,8 +22,9 @@ package org.apache.flink.runtime.operators.chaining;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,7 +52,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 
 	private InMemorySorter<T> sorter;
 
-	private GenericCombine<T> combiner;
+	private FlatCombineFunction<T> combiner;
 
 	private TypeSerializer<T> serializer;
 
@@ -72,10 +73,10 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 		this.parent = parent;
 
 		@SuppressWarnings("unchecked")
-		final GenericCombine<T> combiner =
-			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCombine.class);
+		final FlatCombineFunction<T> combiner =
+			RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class);
 		this.combiner = combiner;
-		combiner.setRuntimeContext(getUdfRuntimeContext());
+		FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext());
 	}
 
 	@Override
@@ -185,7 +186,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 				this.comparator);
 
 			// cache references on the stack
-			final GenericCombine<T> stub = this.combiner;
+			final FlatCombineFunction<T> stub = this.combiner;
 			final Collector<T> output = this.outputCollector;
 
 			// run stub implementation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
index 65af050..0daf69b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -101,7 +101,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator<
 	}
 
 	@Override
-	public final boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector)
+	public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
 	throws Exception
 	{
 		if (this.hashJoin.nextRecord())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
index e124201..70e5afb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -100,7 +100,7 @@ public class BuildSecondHashMatchIterator<V1, V2, O> implements JoinTaskIterator
 	}
 
 	@Override
-	public boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector)
+	public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
 	throws Exception
 	{
 		if (this.hashJoin.nextRecord())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 06d0ac7..da8a11b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -29,7 +29,8 @@ import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -71,7 +72,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 */
 	private static final Log LOG = LogFactory.getLog(CombiningUnilateralSortMerger.class);
 
-	private final GenericCombine<E> combineStub;	// the user code stub that does the combining
+	private final FlatCombineFunction<E> combineStub;	// the user code stub that does the combining
 	
 	private Configuration udfConfig;
 	
@@ -101,7 +102,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
 	 */
-	public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+	public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
@@ -133,7 +134,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
 	 *                                   perform the sort.
 	 */
-	public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
+	public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
@@ -254,12 +255,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			
 			// ------------------- Spilling Phase ------------------------
 			
-			final GenericCombine<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+			final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
 			
 			// now that we are actually spilling, take the combiner, and open it
 			try {
-				Configuration conf = CombiningUnilateralSortMerger.this.udfConfig; 
-				combineStub.open(conf == null ? new Configuration() : conf);
+				Configuration conf = CombiningUnilateralSortMerger.this.udfConfig;
+				FunctionUtils.openFunction (combineStub, (conf == null ? new Configuration() : conf));
 			}
 			catch (Throwable t) {
 				throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
@@ -380,7 +381,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			
 			// close the user code
 			try {
-				combineStub.close();
+				FunctionUtils.closeFunction(combineStub);
 			}
 			catch (Throwable t) {
 				throw new IOException("The user-defined combiner failed in its 'close()' method.", t);
@@ -466,7 +467,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 																			this.memManager.getPageSize());
 			
 			final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer);
-			final GenericCombine<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
+			final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub;
 
 			// combine and write to disk
 			try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
index 308e333..2ed75ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -151,7 +151,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
 	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey()
 	 */
 	@Override
-	public boolean callWithNextKey(final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector)
+	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
 	throws Exception
 	{
 		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
@@ -234,7 +234,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
 	 * @throws Exception Forwards all exceptions thrown by the stub.
 	 */
 	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-			final Iterator<T2> valsN, final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector)
+			final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
 	throws Exception
 	{
 		this.copy1 = this.serializer1.copy(val1, this.copy1);
@@ -267,7 +267,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
 	 * @throws Exception Forwards all exceptions thrown by the stub.
 	 */
 	private void crossSecond1withNValues(T2 val1, T1 firstValN,
-			Iterator<T1> valsN, GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector)
+			Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
 	throws Exception
 	{
 		this.copy2 = this.serializer2.copy(val1, this.copy2);
@@ -280,7 +280,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
 			
 			if (valsN.hasNext()) {
 				this.copy2 = this.serializer2.copy(val1, this.copy2);
-				matchFunction.join(nRec, this.copy2, collector);
+				matchFunction.join(nRec,this.copy2,collector);
 			} else {
 				matchFunction.join(nRec, val1, collector);
 				more = false;
@@ -297,7 +297,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
 	 */
 	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
 			final T2 firstV2, final Iterator<T2> blockVals,
-			final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector)
+			final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
 	throws Exception
 	{
 		// ==================================================
@@ -411,7 +411,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O
 						// get instances of key and block value
 						final T2 nextBlockVal = this.blockIt.next();
 						this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-						matchFunction.join(this.copy1, nextBlockVal, collector);	
+						matchFunction.join(this.copy1, nextBlockVal, collector);
 					}
 					
 					// reset block iterator

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
index 6bcb6ee..3ed647d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.util;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.util.Collector;
 
@@ -60,7 +60,7 @@ public interface JoinTaskIterator<V1, V2, O>
 	 * @return True, if a next key exists, false if no more keys exist.
 	 * @throws Exception Exceptions from the user code are forwarded.
 	 */
-	boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector) throws Exception;
+	boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector) throws Exception;
 	
 	/**
 	 * Aborts the matching process. This extra abort method is supplied, because a significant time may pass while

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index e91e100..3894233 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -23,13 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.BuildFirstCachedMatchDriver;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -43,7 +40,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class CachedMatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record, Record>>
+public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>>
 {
 	private static final long HASH_MEM = 6*1024*1024;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 46717be..5551485 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -23,12 +23,9 @@ import java.util.Iterator;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
@@ -37,7 +34,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class CoGroupTaskExternalITCase extends DriverTestBase<GenericCoGrouper<Record, Record, Record>>
+public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Record, Record, Record>>
 {
 	private static final long SORT_MEM = 3*1024*1024;
 	
@@ -87,7 +84,7 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<GenericCoGrouper<R
 		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
 	}
 	
-	public static final class MockCoGroupStub extends CoGroupFunction {
+	public static final class MockCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction {
 		private static final long serialVersionUID = 1L;
 		
 		private final Record res = new Record();