You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/01 09:29:26 UTC

[10/22] [FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction stubs for all UDFs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 5ca1068..a07a157 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -22,8 +22,10 @@ import java.security.InvalidParameterException;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
@@ -32,19 +34,22 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingJoinOperator;
 import org.apache.flink.api.java.operators.translation.TupleKeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.TypeInformation;
 
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.util.Collector;
 //CHECKSTYLE.ON: AvoidStarImport
 
 /**
@@ -147,12 +152,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	 * @param <I2> The type of the second input DataSet of the Join transformation.
 	 * @param <OUT> The type of the result of the Join transformation.
 	 * 
-	 * @see JoinFunction
+	 * @see org.apache.flink.api.java.functions.RichFlatJoinFunction
 	 * @see DataSet
 	 */
 	public static class EquiJoin<I1, I2, OUT> extends JoinOperator<I1, I2, OUT> {
 		
-		private final JoinFunction<I1, I2, OUT> function;
+		private final FlatJoinFunction<I1, I2, OUT> function;
 		
 		@SuppressWarnings("unused")
 		private boolean preserve1;
@@ -160,7 +165,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		private boolean preserve2;
 		
 		protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2, 
-				Keys<I1> keys1, Keys<I2> keys2, JoinFunction<I1, I2, OUT> function,
+				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
 				TypeInformation<OUT> returnType, JoinHint hint)
 		{
 			super(input1, input2, keys1, keys2, returnType, hint);
@@ -171,14 +176,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			this.function = function;
 
-			if (!(function instanceof ProjectJoinFunction)) {
+			if (!(function instanceof ProjectFlatJoinFunction)) {
 				extractSemanticAnnotationsFromUdf(function.getClass());
 			} else {
-				generateProjectionProperties(((ProjectJoinFunction<?, ?, ?>) function));
+				generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) function));
 			}
 		}
 
-		public void generateProjectionProperties(ProjectJoinFunction<?, ?, ?> pjf) {
+		protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
+				TypeInformation<OUT> returnType, JoinHint hint)
+		{
+			super(input1, input2, keys1, keys2, returnType, hint);
+
+			if (function == null) {
+				throw new NullPointerException();
+			}
+
+			this.function = generatedFunction;
+
+			if (!(generatedFunction instanceof ProjectFlatJoinFunction)) {
+				extractSemanticAnnotationsFromUdf(function.getClass());
+			} else {
+				generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) generatedFunction));
+			}
+		}
+
+		public void generateProjectionProperties(ProjectFlatJoinFunction<?, ?, ?> pjf) {
 			DualInputSemanticProperties props = SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), pjf.getIsFromFirst());
 			setSemanticProperties(props);
 		}
@@ -238,8 +262,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				int[] logicalKeyPositions1 = super.keys1.computeLogicalKeyPositions();
 				int[] logicalKeyPositions2 = super.keys2.computeLogicalKeyPositions();
 				
-				JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, I2, OUT>> po =
-						new JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, I2, OUT>>(function,
+				JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>> po =
+						new JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>>(function,
 								new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
 								logicalKeyPositions1, logicalKeyPositions2,
 								name);
@@ -298,7 +322,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		
 		private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoin(
 				Keys.SelectorFunctionKeys<I1, ?> rawKeys1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2, 
-				JoinFunction<I1, I2, OUT> function, 
+				FlatJoinFunction<I1, I2, OUT> function,
 				TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
 				Operator<I1> input1, Operator<I2> input2)
 		{
@@ -313,10 +337,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
 			final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
 
-			final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
-					new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-			final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
-					new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+			final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+					new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+			final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+					new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
 			final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
 			
 			join.setFirstInput(keyMapper1);
@@ -333,7 +357,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		
 		private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinRight(
 				int[] logicalKeyPositions1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2, 
-				JoinFunction<I1, I2, OUT> function, 
+				FlatJoinFunction<I1, I2, OUT> function,
 				TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
 				Operator<I1> input1, Operator<I2> input2)
 		{
@@ -350,10 +374,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			final TupleKeyExtractingMapper<I1, K> extractor1 = new TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]);
 			final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
 
-			final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
-					new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-			final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
-					new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+			final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+					new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+			final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+					new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
 			
 			final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, logicalKeyPositions1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
 			
@@ -371,7 +395,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		
 		private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinLeft(
 				Keys.SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
-				JoinFunction<I1, I2, OUT> function, 
+				FlatJoinFunction<I1, I2, OUT> function,
 				TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
 				Operator<I1> input1, Operator<I2> input2)
 		{
@@ -388,10 +412,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
 			final TupleKeyExtractingMapper<I2, K> extractor2 = new TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]);
 
-			final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>> keyMapper1 =
-					new MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-			final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>> keyMapper2 =
-					new MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+			final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+					new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+			final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+					new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
 			
 			final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
 			
@@ -424,28 +448,73 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
 		{
 			super(input1, input2, keys1, keys2, 
-				(JoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultJoinFunction<I1, I2>(),
+				(RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
 				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint);
 		}
 		
 		/**
-		 * Finalizes a Join transformation by applying a {@link JoinFunction} to each pair of joined elements.<br/>
+		 * Finalizes a Join transformation by applying a {@link org.apache.flink.api.java.functions.RichFlatJoinFunction} to each pair of joined elements.<br/>
 		 * Each JoinFunction call returns exactly one element. 
 		 * 
 		 * @param function The JoinFunction that is called for each pair of joined elements.
 		 * @return An EquiJoin that represents the joined result DataSet
 		 * 
-		 * @see JoinFunction
+		 * @see org.apache.flink.api.java.functions.RichFlatJoinFunction
 		 * @see org.apache.flink.api.java.operators.JoinOperator.EquiJoin
 		 * @see DataSet
 		 */
-		public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> function) {
+		public <R> EquiJoin<I1, I2, R> with(FlatJoinFunction<I1, I2, R> function) {
 			if (function == null) {
 				throw new NullPointerException("Join function must not be null.");
 			}
+			if (FunctionUtils.isSerializedLambdaFunction(function)) {
+				throw new UnsupportedLambdaExpressionException();
+			}
 			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
 			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
 		}
+
+		public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
+			if (function == null) {
+				throw new NullPointerException("Join function must not be null.");
+			}
+			if (FunctionUtils.isSerializedLambdaFunction(function)) {
+				throw new UnsupportedLambdaExpressionException();
+			}
+			FlatJoinFunction generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(function);
+			TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
+			return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint());
+		}
+
+		private static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
+
+			private static final long serialVersionUID = 1L;
+
+			private WrappingFlatJoinFunction(JoinFunction<IN1, IN2, OUT> wrappedFunction) {
+				super(wrappedFunction);
+			}
+
+			@Override
+			public void join(IN1 left, IN2 right, Collector<OUT> out) throws Exception {
+				out.collect (this.wrappedFunction.join(left, right));
+			}
+		}
+
+		/*
+		private static class GeneratedFlatJoinFunction<IN1, IN2, OUT> extends FlatJoinFunction<IN1, IN2, OUT> {
+
+			private Joinable<IN1,IN2,OUT> function;
+
+			private GeneratedFlatJoinFunction(Joinable<IN1, IN2, OUT> function) {
+				this.function = function;
+			}
+
+			@Override
+			public void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception {
+				out.collect(function.join(first, second));
+			}
+		}
+		*/
 		
 		/**
 		 * Initiates a ProjectJoin transformation and projects the first join input<br/>
@@ -530,7 +599,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
 			super(input1, input2, keys1, keys2, 
-					new ProjectJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), 
+					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
 					returnType, hint);
 		}
 
@@ -821,20 +890,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	//  default join functions
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class DefaultJoinFunction<T1, T2> extends JoinFunction<T1, T2, Tuple2<T1, T2>> {
+	public static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
 
 		private static final long serialVersionUID = 1L;
 		private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
 
 		@Override
-		public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception {
+		public void join(T1 first, T2 second, Collector<Tuple2<T1,T2>> out) throws Exception {
 			outTuple.f0 = first;
 			outTuple.f1 = second;
-			return outTuple;
+			out.collect(outTuple);
 		}
 	}
 	
-	public static final class ProjectJoinFunction<T1, T2, R extends Tuple> extends JoinFunction<T1, T2, R> {
+	public static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> {
 		
 		private static final long serialVersionUID = 1L;
 		
@@ -851,7 +920,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @param isFromFirst List of flags indicating whether the field should be copied from the first (true) or the second (false) input.
 		 * @param outTupleInstance An instance of an output tuple.
 		 */
-		private ProjectJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
+		private ProjectFlatJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
 			
 			if(fields.length != isFromFirst.length) {
 				throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!"); 
@@ -869,7 +938,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			return isFromFirst;
 		}
 
-		public R join(T1 in1, T2 in2) {
+		public void join(T1 in1, T2 in2, Collector<R> out) {
 			for(int i=0; i<fields.length; i++) {
 				if(isFromFirst[i]) {
 					if(fields[i] >= 0) {
@@ -885,27 +954,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					}
 				}
 			}
-			return outTuple;
+			out.collect(outTuple);
 		}
 	}
 	
-	public static final class LeftSemiJoinFunction<T1, T2> extends JoinFunction<T1, T2, T1> {
+	public static final class LeftSemiFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, T1> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public T1 join(T1 left, T2 right) throws Exception {
-			return left;
+		//public T1 join(T1 left, T2 right) throws Exception {
+		//	return left;
+		//}
+		public void join (T1 left, T2 right, Collector<T1> out) {
+			out.collect(left);
 		}
 	}
 	
-	public static final class RightSemiJoinFunction<T1, T2> extends JoinFunction<T1, T2, T2> {
+	public static final class RightSemiFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, T2> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public T2 join(T1 left, T2 right) throws Exception {
-			return right;
+		//public T2 join(T1 left, T2 right) throws Exception {
+		//	return right;
+		//}
+		public void join (T1 left, T2 right, Collector<T2> out) {
+			out.collect(right);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 03c6037..eccdeec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.flink.api.java.DataSet;
@@ -34,7 +33,7 @@ import org.apache.flink.api.java.DataSet;
  * @param <IN> The type of the data set consumed by the operator.
  * @param <OUT> The type of the data set created by the operator.
  * 
- * @see MapFunction
+ * @see org.apache.flink.api.common.functions.MapFunction
  */
 public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
 	
@@ -42,6 +41,7 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 	
 	
 	public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) {
+
 		super(input, TypeExtractor.getMapReturnTypes(function, input.getType()));
 		
 		this.function = function;
@@ -49,11 +49,11 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 	}
 
 	@Override
-	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		
 		String name = getName() != null ? getName() : function.getClass().getName();
 		// create operator
-		MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> po = new MapOperatorBase<IN, OUT, GenericMap<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+		MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
 		// set dop

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9e94670..dd5a3bd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import java.util.Arrays;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
@@ -51,7 +51,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	}
 
 	@Override
-	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, GenericMap<IN,OUT>> translateToDataFlow(Operator<IN> input) {		
+	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
 		String name = getName() != null ? getName() : "Projection " + Arrays.toString(fields);
 		// create operator
 		PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
deleted file mode 100644
index d88d43d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericMap;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.types.TypeInformation;
-
-import org.apache.flink.api.java.DataSet;
-
-/**
- * This operator represents the application of a "reduceGroup" function on a data set, and the
- * result data set produced by the function.
- * 
- * @param <IN> The type of the data set consumed by the operator.
- * @param <OUT> The type of the data set created by the operator.
- */
-public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, ReduceGroupOperator<IN, OUT>> {
-	
-	private final GroupReduceFunction<IN, OUT> function;
-	
-	private final Grouping<IN> grouper;
-	
-	private boolean combinable;
-	
-	
-	/**
-	 * Constructor for a non-grouped reduce (all reduce).
-	 * 
-	 * @param input The input data set to the groupReduce function.
-	 * @param function The user-defined GroupReduce function.
-	 */
-	public ReduceGroupOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function) {
-		super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType()));
-		
-		this.function = function;
-		this.grouper = null;
-		checkCombinability();
-	}
-	
-	/**
-	 * Constructor for a grouped reduce.
-	 * 
-	 * @param input The grouped input to be processed group-wise by the groupReduce function.
-	 * @param function The user-defined GroupReduce function.
-	 */
-	public ReduceGroupOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT> function) {
-		super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType()));
-		
-		this.function = function;
-		this.grouper = input;
-		checkCombinability();
-		
-		extractSemanticAnnotationsFromUdf(function.getClass());
-	}
-	
-	private void checkCombinability() {
-		if (function instanceof GenericCombine && function.getClass().getAnnotation(Combinable.class) != null) {
-			this.combinable = true;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Properties
-	// --------------------------------------------------------------------------------------------
-	
-	public boolean isCombinable() {
-		return combinable;
-	}
-	
-	public void setCombinable(boolean combinable) {
-		// sanity check that the function is a subclass of the combine interface
-		if (combinable && !(function instanceof GenericCombine)) {
-			throw new IllegalArgumentException("The function does not implement the combine interface.");
-		}
-		
-		this.combinable = combinable;
-	}
-	
-	@Override
-	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> translateToDataFlow(Operator<IN> input) {
-		
-		String name = getName() != null ? getName() : function.getClass().getName();
-		
-		// distinguish between grouped reduce and non-grouped reduce
-		if (grouper == null) {
-			// non grouped reduce
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-			GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
-					new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, new int[0], name);
-
-			po.setCombinable(combinable);
-			// set input
-			po.setInput(input);
-			// the degree of parallelism for a non grouped reduce can only be 1
-			po.setDegreeOfParallelism(1);
-			return po;
-		}
-	
-		if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
-		
-			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
-			
-			PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
-							selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable());
-			
-			po.setDegreeOfParallelism(this.getParallelism());
-			
-			return po;
-		}
-		else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) {
-
-			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-			GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
-					new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
-
-			po.setCombinable(combinable);
-			po.setInput(input);
-			po.setDegreeOfParallelism(this.getParallelism());
-			
-			// set group order
-			if (grouper instanceof SortedGrouping) {
-				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
-								
-				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
-				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
-				
-				Ordering o = new Ordering();
-				for(int i=0; i < sortKeyPositions.length; i++) {
-					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
-				}
-				po.setGroupOrder(o);
-			}
-			
-			return po;
-		}
-		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
-
-			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-			GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
-					new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
-
-			po.setCombinable(combinable);
-			po.setInput(input);
-			po.setDegreeOfParallelism(this.getParallelism());
-			
-			return po;
-		}
-		else {
-			throw new UnsupportedOperationException("Unrecognized key type.");
-		}
-		
-	}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionReducer(
-			Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function,
-			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
-			boolean combinable)
-	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
-		
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
-		
-		KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
-		
-		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
-		
-		MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
-
-		reducer.setInput(mapper);
-		mapper.setInput(input);
-		
-		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
-		
-		return reducer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 12e0f89..13a6c91 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.GenericMap;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.java.functions.ReduceFunction;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -40,7 +39,7 @@ import org.apache.flink.api.java.DataSet;
  * 
  * @param <IN> The type of the data set reduced by the operator.
  * 
- * @see ReduceFunction
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  */
 public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
 	
@@ -83,8 +82,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		if (grouper == null) {
 			// non grouped reduce
 			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
-			ReduceOperatorBase<IN, GenericReduce<IN>> po =
-					new ReduceOperatorBase<IN, GenericReduce<IN>>(function, operatorInfo, new int[0], name);
+			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
+					new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, new int[0], name);
 			// set input
 			po.setInput(input);
 			
@@ -109,8 +108,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 			// reduce with field positions
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
 			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
-			ReduceOperatorBase<IN, GenericReduce<IN>> po =
-					new ReduceOperatorBase<IN, GenericReduce<IN>>(function, operatorInfo, logicalKeyPositions, name);
+			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
+					new ReduceOperatorBase<IN, ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions, name);
 			
 			// set input
 			po.setInput(input);
@@ -139,8 +138,8 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		
 		PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<T, K>(function, keys, name, inputType, typeInfoWithKey);
 		
-		MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
-		MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
+		MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
+		MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
 
 		keyExtractingMap.setInput(input);
 		reducer.setInput(keyExtractingMap);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index fa2c1aa..dcdbed4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet;
 /**
  * The <tt>SingleInputUdfOperator</tt> is the base class of all unary operators that execute
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
- * have one input (such as {@link org.apache.flink.api.java.functions.MapFunction} or
- * {@link org.apache.flink.api.java.functions.ReduceFunction}).
+ * have one input (such as {@link org.apache.flink.api.java.functions.RichMapFunction} or
+ * {@link org.apache.flink.api.java.functions.RichReduceFunction}).
  * <p>
  * This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
  * through configuration objects, and semantic properties.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 89c8bb2..97b2417 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 
 /**
  * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
  * The following transformation can be applied on sorted groups:
  * <ul>
- * 	<li>{@link SortedGrouping#reduceGroup(GroupReduceFunction)},</li>
+ * 	<li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li>
  * </ul>
  * 
  * @param <T> The type of the elements of the sorted and grouped DataSet.
@@ -65,23 +67,27 @@ public class SortedGrouping<T> extends Grouping<T> {
 
 	/**
 	 * Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.<br/>
-	 * The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
+	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of the DataSet.
 	 * A GroupReduceFunction can iterate over all elements of a group and emit any
 	 *   number of output elements including none.
 	 * 
 	 * @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
 	 * @return A GroupReduceOperator that represents the reduced DataSet.
 	 * 
-	 * @see GroupReduceFunction
-	 * @see ReduceGroupOperator
+	 * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+	 * @see GroupReduceOperator
 	 * @see DataSet
 	 */
-	public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
+	public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		return new ReduceGroupOperator<T, R>(this, reducer);
+		if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+			throw new UnsupportedLambdaExpressionException();
+		}
+		return new GroupReduceOperator<T, R>(this, reducer);
 	}
+
 	
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index a85ca3f..f347fef 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet;
 /**
  * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators that execute
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
- * have two inputs (such as {@link org.apache.flink.api.java.functions.JoinFunction} or 
- * {@link org.apache.flink.api.java.functions.CoGroupFunction}).
+ * have two inputs (such as {@link org.apache.flink.api.java.functions.RichJoinFunction} or
+ * {@link org.apache.flink.api.java.functions.RichCoGroupFunction}).
  * <p>
  * This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
  * through configuration objects, and semantic properties.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 2040a27..bf33f4e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.java.DataSet;
 
 /**
  * This interface marks operators as operators that execute user-defined functions (UDFs), such as
- * {@link org.apache.flink.api.java.functions.MapFunction}, {@link org.apache.flink.api.java.functions.ReduceFunction},
- * or {@link org.apache.flink.api.java.functions.CoGroupFunction}.
+ * {@link org.apache.flink.api.java.functions.RichMapFunction}, {@link org.apache.flink.api.java.functions.RichReduceFunction},
+ * or {@link org.apache.flink.api.java.functions.RichCoGroupFunction}.
  * The UDF operators stand in contrast to operators that execute built-in operations, like aggregations.
  */
 public interface UdfOperator<O extends UdfOperator<O>> {
@@ -39,7 +39,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
 	
 	/**
 	 * Gets the configuration parameters that will be passed to the UDF's open method
-	 * {@link org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)}. 
+	 * {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)}.
 	 * The configuration is set via the {@link #withParameters(Configuration)}
 	 * method.
 	 * 
@@ -69,7 +69,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
 	
 	/**
 	 * Sets the configuration parameters for the UDF. These are optional parameters that are passed
-	 * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)} method.
+	 * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)} method.
 	 * 
 	 * @param parameters The configuration parameters for the UDF.
 	 * @return The operator itself, to allow chaining function calls.
@@ -83,7 +83,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
 	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getBroadcastVariable(String)}.
 	 * 
 	 * The runtime context itself is available in all UDFs via
-	 * {@link org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}.
+	 * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()}.
 	 * 
 	 * @param data The data set to be broadcasted.
 	 * @param name The name under which the broadcast data set retrieved.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 1d9d70d..9e71ba0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 
 public class UnsortedGrouping<T> extends Grouping<T> {
 
@@ -90,14 +92,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	
 	/**
 	 * Applies a Reduce transformation on a grouped {@link DataSet}.<br/>
-	 * For each group, the transformation consecutively calls a {@link ReduceFunction} 
+	 * For each group, the transformation consecutively calls a {@link org.apache.flink.api.java.functions.RichReduceFunction}
 	 *   until only a single element for each group remains. 
 	 * A ReduceFunction combines two elements into one new element of the same type.
 	 * 
 	 * @param reducer The ReduceFunction that is applied on each group of the DataSet.
 	 * @return A ReduceOperator that represents the reduced DataSet.
 	 * 
-	 * @see ReduceFunction
+	 * @see org.apache.flink.api.java.functions.RichReduceFunction
 	 * @see ReduceOperator
 	 * @see DataSet
 	 */
@@ -110,24 +112,28 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 	
 	/**
 	 * Applies a GroupReduce transformation on a grouped {@link DataSet}.<br/>
-	 * The transformation calls a {@link GroupReduceFunction} for each group of the DataSet.
+	 * The transformation calls a {@link org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of the DataSet.
 	 * A GroupReduceFunction can iterate over all elements of a group and emit any
 	 *   number of output elements including none.
 	 * 
 	 * @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
 	 * @return A GroupReduceOperator that represents the reduced DataSet.
 	 * 
-	 * @see GroupReduceFunction
-	 * @see ReduceGroupOperator
+	 * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+	 * @see GroupReduceOperator
 	 * @see DataSet
 	 */
-	public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
+	public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
 		if (reducer == null) {
 			throw new NullPointerException("GroupReduce function must not be null.");
 		}
-		return new ReduceGroupOperator<T, R>(this, reducer);
+		if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+			throw new UnsupportedLambdaExpressionException();
+		}
+		return new GroupReduceOperator<T, R>(this, reducer);
 	}
 
+
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index aea99e3..c7f65f0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,11 +19,11 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 
-public final class KeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, T>> {
+public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 52cbcd3..a6cd837 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 
-public final class KeyRemovingMapper<T, K> extends MapFunction<Tuple2<K, T>, T> {
+public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index 7fb9c0f..8ac2d01 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.java.functions.FilterFunction;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 
-public class PlanFilterOperator<T> extends FilterOperatorBase<T, GenericFlatMap<T, T>> {
+public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {
 	
 	public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
 		super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
 	}
 
 	public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
-		implements GenericFlatMap<T, T>
+		implements FlatMapFunction<T, T>
 	{
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 521814c..4de7311 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.types.TypeInformation;
 
-public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, GenericMap<T, R>> {
+public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
 
 	public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType) {
 		super(new MapProjector<T, R>(fields, outType.createSerializer().createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name);
 	}
 	
 	public static final class MapProjector<T, R extends Tuple>
-		extends AbstractFunction
-		implements GenericMap<T, R>
+		extends AbstractRichFunction
+		implements MapFunction<T, R>
 	{
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
index 20bd3b0..89290f0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
@@ -20,20 +20,19 @@ package org.apache.flink.api.java.operators.translation;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.java.functions.CoGroupFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> 
-	extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
+	extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
 {
 
-	public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, 
+	public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
 			Keys.SelectorFunctionKeys<I1, K> key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
 			TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
 	{
@@ -42,7 +41,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name);
 	}
 	
-	public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, 
+	public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
 			int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
 			TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
 	{
@@ -51,7 +50,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
 				new int[]{0}, key2.computeLogicalKeyPositions(), name);
 	}
 	
-	public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, 
+	public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
 			Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, String name,
 			TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
 	{
@@ -63,7 +62,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class TupleUnwrappingCoGrouper<I1, I2, OUT, K> extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
-		implements GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
+		implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
 	{
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
index c121efe..73ea004 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
@@ -18,20 +18,19 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.java.functions.JoinFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> 
-	extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
+	extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
 {
 
-	public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, 
+	public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
 			Keys.SelectorFunctionKeys<I1, K> key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
 			TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
 	{
@@ -40,7 +39,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
 				key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name);
 	}
 	
-	public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, 
+	public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
 			int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, String name,
 			TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
 	{
@@ -49,7 +48,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
 				new int[]{0}, key2.computeLogicalKeyPositions(), name);
 	}
 	
-	public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, 
+	public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
 			Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, String name,
 			TypeInformation<OUT> type, TypeInformation<Tuple2<K, I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
 	{
@@ -59,21 +58,26 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
 	}
 
 	public static final class TupleUnwrappingJoiner<I1, I2, OUT, K>
-		extends WrappingFunction<JoinFunction<I1, I2, OUT>>
-		implements GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
+		extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+		implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
 	{
 
 		private static final long serialVersionUID = 1L;
 		
-		private TupleUnwrappingJoiner(JoinFunction<I1, I2, OUT> wrapped) {
+		private TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
 			super(wrapped);
 		}
 
+		//@SuppressWarnings("unchecked")
+		//@Override
+		//public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) throws Exception {
+		//	return wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1)));
+		//}
+
 		@SuppressWarnings("unchecked")
 		@Override
-		public void join(Tuple2<K, I1> value1, Tuple2<K, I2> value2,
-				Collector<OUT> out) throws Exception {
-			out.collect(wrappedFunction.join((I1)(value1.getField(1)), (I2)(value2.getField(1))));
+		public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
+			wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector);
 		}
 		
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 5a59664..5e80455 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.java.operators.translation;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
@@ -35,12 +34,12 @@ import org.apache.flink.util.Collector;
  * A reduce operator that takes 2-tuples (key-value pairs), and applies the group reduce operation only
  * on the unwrapped values.
  */
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GenericGroupReduce<Tuple2<K, IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> {
 
 	public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
 			TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
 	{
-		super(combinable ? new TupleUnwrappingCombinableGroupReducer<IN, OUT, K>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+		super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
 				new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
 		
 		super.setCombinable(combinable);
@@ -48,9 +47,9 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	
 	// --------------------------------------------------------------------------------------------
 	
-	@Combinable
-	public static final class TupleUnwrappingCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GenericGroupReduce<Tuple2<K, IN>, OUT>, GenericCombine<Tuple2<K, IN>>
+	@RichGroupReduceFunction.Combinable
+	public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT>, FlatCombineFunction<Tuple2<K, IN>>
 	{
 
 		private static final long serialVersionUID = 1L;
@@ -58,7 +57,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 		private TupleUnwrappingIterator<IN, K> iter;
 		private TupleWrappingCollector<IN, K> coll; 
 		
-		private TupleUnwrappingCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
+		private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new TupleUnwrappingIterator<IN, K>();
 			this.coll = new TupleWrappingCollector<IN, K>(this.iter);
@@ -85,7 +84,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
 	}
 	
 	public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-		implements GenericGroupReduce<Tuple2<K, IN>, OUT>
+		implements GroupReduceFunction<Tuple2<K, IN>, OUT>
 	{
 	
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 66aa430..4da981c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.java.functions.ReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
@@ -31,7 +30,7 @@ import org.apache.flink.types.TypeInformation;
  * A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only
  * on the unwrapped values.
  */
-public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, GenericReduce<Tuple2<K, T>>> {
+public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
 
 	public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name,
 			TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey)
@@ -40,7 +39,7 @@ public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple
 	}
 
 	public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
-		implements GenericReduce<Tuple2<K, T>>
+		implements ReduceFunction<Tuple2<K, T>>
 	{
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
index a915d1c..ecac775 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 
-public final class TupleKeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, T>> {
+public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index c98df6b..267d879 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -29,20 +29,21 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Value;
 
 
-public abstract class WrappingFunction<T extends AbstractFunction> extends AbstractFunction {
+public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
 	
 	private static final long serialVersionUID = 1L;
 
 	protected final T wrappedFunction;
-	
-	
+
 	protected WrappingFunction(T wrappedFunction) {
 		this.wrappedFunction = wrappedFunction;
 	}
@@ -50,12 +51,12 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr
 	
 	@Override
 	public void open(Configuration parameters) throws Exception {
-		this.wrappedFunction.open(parameters);
+		FunctionUtils.openFunction(this.wrappedFunction, parameters);
 	}
 	
 	@Override
 	public void close() throws Exception {
-		this.wrappedFunction.close();
+		FunctionUtils.closeFunction(this.wrappedFunction);
 	}
 	
 	@Override
@@ -63,13 +64,16 @@ public abstract class WrappingFunction<T extends AbstractFunction> extends Abstr
 		super.setRuntimeContext(t);
 		
 		if (t instanceof IterationRuntimeContext) {
-			this.wrappedFunction.setRuntimeContext(new WrappingIterationRuntimeContext(t));
+			FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingIterationRuntimeContext(t));
 		}
 		else{
-			this.wrappedFunction.setRuntimeContext(new WrappingRuntimeContext(t));
+			FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new WrappingRuntimeContext(t));
 		}
 	}
-	
+
+	public T getWrappedFunction () {
+		return this.wrappedFunction;
+	}
 	
 	
 	private static class WrappingRuntimeContext implements RuntimeContext {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index 633adab..fa0ca11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -21,15 +21,14 @@ package org.apache.flink.api.java.record.functions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 /**
  * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CoGroupOperator}.
  */
-public abstract class CoGroupFunction extends AbstractFunction implements GenericCoGrouper<Record, Record, Record> {
+public abstract class CoGroupFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
index b2185a2..c4587fd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
@@ -19,15 +19,13 @@
 
 package org.apache.flink.api.java.record.functions;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
 
 /**
  * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.operators.CrossOperator}.
  */
-public abstract class CrossFunction extends AbstractFunction implements GenericCrosser<Record, Record, Record> {
+public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -35,14 +33,19 @@ public abstract class CrossFunction extends AbstractFunction implements GenericC
 	 * This method must be implemented to provide a user implementation of a cross.
 	 * It is called for each element of the Cartesian product of both input sets.
 
-	 * @param record1 The record from the second input.
-	 * @param record2 The record from the second input.
-	 * @param out A collector that collects all output records.
+	 * @param first The record from the second input.
+	 * @param second The record from the second input.
+	 * @return The result of the cross UDF
 	 * 
 	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
 	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
 	 *                   decide whether to retry the task execution.
 	 */
+
+	//@Override
+	//public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception;
+
 	@Override
-	public abstract void cross(Record record1, Record record2, Collector<Record> out) throws Exception;
+	public abstract Record cross(Record first, Record second) throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
index 0222c63..dce24a3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.api.java.record.functions;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
  * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.operators.JoinOperator}.
  * It resembles an equality join of both inputs on their key fields.
  */
-public abstract class JoinFunction extends AbstractFunction implements GenericJoiner<Record, Record, Record> {
+public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> {
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -38,7 +38,7 @@ public abstract class JoinFunction extends AbstractFunction implements GenericJo
 	 * 
 	 * @param value1 The record that comes from the first input.
 	 * @param value2 The record that comes from the second input.
-	 * @param out A collector that collects all output pairs.
+	 * @return The result of the join UDF as record
 	 * 
 	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
 	 *                   runtime catches an exception, it aborts the combine task and lets the fail-over logic

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
index 88b6282..99c945d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.java.record.functions;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
  * The MapFunction must be extended to provide a mapper implementation
  * By definition, the mapper is called for each individual input record.
  */
-public abstract class MapFunction extends AbstractFunction implements GenericCollectorMap<Record, Record> {
+public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> {
 	
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index 4b1dbb3..073b11a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.record.functions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
@@ -31,7 +31,7 @@ import org.apache.flink.util.Collector;
  * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
  * {@link org.apache.flink.api.java.operators.ReduceOperator}.
  */
-public abstract class ReduceFunction extends AbstractFunction implements GenericGroupReduce<Record, Record>, GenericCombine<Record> {
+public abstract class ReduceFunction extends AbstractRichFunction implements GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
 	
 	private static final long serialVersionUID = 1L;