You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/06/08 09:41:06 UTC

[2/2] flink git commit: [FLINK-6783] Changed passing index of type argument while extracting return type.

[FLINK-6783] Changed passing index of type argument while extracting return type.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcaf816d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcaf816d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcaf816d

Branch: refs/heads/master
Commit: bcaf816dc5313c6c7de1e3436cc87036fd2ea3d0
Parents: 1cc1bb4
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Thu Jun 1 13:17:25 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 8 10:42:41 2017 +0200

----------------------------------------------------------------------
 .../flink/api/common/functions/Partitioner.java |   6 +-
 .../api/java/typeutils/TypeExtractionUtils.java |  74 +++
 .../flink/api/java/typeutils/TypeExtractor.java | 642 ++++++++++++-------
 .../java/type/lambdas/LambdaExtractionTest.java |  13 +
 .../org/apache/flink/cep/CEPLambdaTest.java     |   2 -
 .../org/apache/flink/cep/PatternStream.java     |  24 +-
 .../cep/operator/CEPMigration11to13Test.java    |   4 +-
 .../flink/graph/asm/translate/Translate.java    |  44 +-
 .../api/datastream/AllWindowedStream.java       |  57 +-
 .../api/datastream/AsyncDataStream.java         |  13 +-
 .../api/datastream/CoGroupedStreams.java        |  15 +-
 .../api/datastream/ConnectedStreams.java        |  48 +-
 .../streaming/api/datastream/DataStream.java    |  16 +-
 .../streaming/api/datastream/JoinedStreams.java |  40 +-
 .../streaming/api/datastream/KeyedStream.java   |  16 +-
 .../api/datastream/WindowedStream.java          |  58 +-
 .../windowing/WindowTranslationTest.java        | 232 +++----
 17 files changed, 846 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
index 6c237ed..c272d3a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
@@ -22,15 +22,15 @@ import org.apache.flink.annotation.Public;
 
 /**
  * Function to implement a custom partition assignment for keys.
- * 
+ *
  * @param <K> The type of the key to be partitioned.
  */
 @Public
-public interface Partitioner<K> extends java.io.Serializable {
+public interface Partitioner<K> extends java.io.Serializable, Function {
 
 	/**
 	 * Computes the partition for the given key.
-	 * 
+	 *
 	 * @param key The key.
 	 * @param numPartitions The number of partitions to partition into.
 	 * @return The partition index.

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index 0aac257..c6ffd55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
@@ -28,6 +29,8 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+
 import static org.objectweb.asm.Type.getConstructorDescriptor;
 import static org.objectweb.asm.Type.getMethodDescriptor;
 
@@ -161,6 +164,77 @@ public class TypeExtractionUtils {
 	}
 
 	/**
+	 * Extracts type from given index from lambda. It supports nested types.
+	 *
+	 * @param exec lambda function to extract the type from
+	 * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
+	 * @param paramLen count of total parameters of the lambda (including closure parameters)
+	 * @param baseParametersLen count of lambda interface parameters (without closure parameters)
+	 * @return extracted type
+	 */
+	public static Type extractTypeFromLambda(
+		LambdaExecutable exec,
+		int[] lambdaTypeArgumentIndices,
+		int paramLen,
+		int baseParametersLen) {
+		Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
+		for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
+			output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]);
+		}
+		return output;
+	}
+
+	/**
+	 * This method extracts the n-th type argument from the given type. An InvalidTypesException
+	 * is thrown if the type does not have any type arguments or if the index exceeds the number
+	 * of type arguments.
+	 *
+	 * @param t Type to extract the type arguments from
+	 * @param index Index of the type argument to extract
+	 * @return The extracted type argument
+	 * @throws InvalidTypesException if the given type does not have any type arguments or if the
+	 * index exceeds the number of type arguments.
+	 */
+	public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
+		if (t instanceof ParameterizedType) {
+			Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
+
+			if (index < 0 || index >= actualTypeArguments.length) {
+				throw new InvalidTypesException("Cannot extract the type argument with index " +
+												index + " because the type has only " + actualTypeArguments.length +
+												" type arguments.");
+			} else {
+				return actualTypeArguments[index];
+			}
+		} else {
+			throw new InvalidTypesException("The given type " + t + " is not a parameterized type.");
+		}
+	}
+
+	/**
+	 * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object,
+	 * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class.
+	 *
+	 * @param baseClass
+	 * @throws InvalidTypesException if the given class does not implement
+	 * @return
+	 */
+	public static Method getSingleAbstractMethod(Class<?> baseClass) {
+		Method sam = null;
+		for (Method method : baseClass.getMethods()) {
+			if (Modifier.isAbstract(method.getModifiers())) {
+				if (sam == null) {
+					sam = method;
+				} else {
+					throw new InvalidTypesException(
+						"Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM.");
+				}
+			}
+		}
+		return sam;
+	}
+
+	/**
 	 * Returns all declared methods of a class including methods of superclasses.
 	 */
 	public static List<Method> getAllDeclaredMethods(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 112ca38..c50dfc9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -116,6 +116,8 @@ public class TypeExtractor {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
 
+	public static final int[] NO_INDEX = new int[] {};
+
 	protected TypeExtractor() {
 		// only create instances for special use cases
 	}
@@ -161,9 +163,18 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
 			String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) mapInterface,
+			MapFunction.class,
+			0,
+			1,
+			new int[]{0},
+			NO_INDEX,
+			inType,
+			functionName,
+			allowMissing);
 	}
-	
+
 
 	@PublicEvolving
 	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
@@ -174,7 +185,16 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType,
 			String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) flatMapInterface,
+			FlatMapFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{1, 0},
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	/**
@@ -194,7 +214,16 @@ public class TypeExtractor {
 	@Deprecated
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) foldInterface,
+			FoldFunction.class,
+			0,
+			1,
+			new int[]{1},
+			NO_INDEX,
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -205,7 +234,15 @@ public class TypeExtractor {
 			boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			function, AggregateFunction.class, 0, 1, inType, functionName, allowMissing);
+			function,
+			AggregateFunction.class,
+			0,
+			1,
+			new int[]{0},
+			NO_INDEX,
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -216,7 +253,15 @@ public class TypeExtractor {
 			boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-				function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing);
+			function,
+			AggregateFunction.class,
+			0,
+			2,
+			NO_INDEX,
+			NO_INDEX,
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -228,7 +273,16 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType,
 			String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) mapPartitionInterface,
+			MapPartitionFunction.class,
+			0,
+			1,
+			new int[]{0, 0},
+			new int[]{1, 0},
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -240,7 +294,16 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType,
 			String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) groupReduceInterface,
+			GroupReduceFunction.class,
+			0,
+			1,
+			new int[]{0, 0},
+			new int[]{1, 0},
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -252,7 +315,16 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
 																			String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) combineInterface,
+			GroupCombineFunction.class,
+			0,
+			1,
+			new int[]{0, 0},
+			new int[]{1, 0},
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -266,8 +338,19 @@ public class TypeExtractor {
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
-		return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true,
-				in1Type, in2Type, functionName, allowMissing);
+		return getBinaryOperatorReturnType(
+			(Function) joinInterface,
+			FlatJoinFunction.class,
+			0,
+			1,
+			2,
+			new int[]{0},
+			new int[]{1},
+			new int[]{2, 0},
+			in1Type,
+			in2Type,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -281,8 +364,19 @@ public class TypeExtractor {
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
-		return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false,
-				in1Type, in2Type, functionName, allowMissing);
+		return getBinaryOperatorReturnType(
+			(Function) joinInterface,
+			JoinFunction.class,
+			0,
+			1,
+			2,
+			new int[]{0},
+			new int[]{1},
+			NO_INDEX,
+			in1Type,
+			in2Type,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -296,8 +390,19 @@ public class TypeExtractor {
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
-		return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true,
-				in1Type, in2Type, functionName, allowMissing);
+		return getBinaryOperatorReturnType(
+			(Function) coGroupInterface,
+			CoGroupFunction.class,
+			0,
+			1,
+			2,
+			new int[]{0, 0},
+			new int[]{1, 0},
+			new int[]{2, 0},
+			in1Type,
+			in2Type,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -311,8 +416,19 @@ public class TypeExtractor {
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
-		return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false,
-				in1Type, in2Type, functionName, allowMissing);
+		return getBinaryOperatorReturnType(
+			(Function) crossInterface,
+			CrossFunction.class,
+			0,
+			1,
+			2,
+			new int[]{0},
+			new int[]{1},
+			NO_INDEX,
+			in1Type,
+			in2Type,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -324,7 +440,16 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
 			TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
-		return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing);
+		return getUnaryOperatorReturnType(
+			(Function) selectorInterface,
+			KeySelector.class,
+			0,
+			1,
+			new int[]{0},
+			NO_INDEX,
+			inType,
+			functionName,
+			allowMissing);
 	}
 
 	@PublicEvolving
@@ -333,11 +458,53 @@ public class TypeExtractor {
 	}
 
 	@PublicEvolving
-	public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) {
-		return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null);
+	public static <T> TypeInformation<T> getPartitionerTypes(
+		Partitioner<T> partitioner,
+		String functionName,
+		boolean allowMissing) {
+		try {
+			final LambdaExecutable exec;
+			try {
+				exec = checkAndExtractLambda(partitioner);
+			} catch (TypeExtractionException e) {
+				throw new InvalidTypesException("Internal error occurred.", e);
+			}
+			if (exec != null) {
+				// check for lambda type erasure
+				validateLambdaGenericParameters(exec);
+
+				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
+				final int paramLen = exec.getParameterTypes().length;
+
+				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class);
+				// number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range
+				final int baseParametersLen = sam.getParameterTypes().length;
+
+				final Type keyType = TypeExtractionUtils.extractTypeFromLambda(
+					exec,
+					new int[]{0},
+					paramLen,
+					baseParametersLen);
+				return new TypeExtractor().privateCreateTypeInfo(keyType, null, null);
+			} else {
+				return new TypeExtractor().privateCreateTypeInfo(
+					Partitioner.class,
+					partitioner.getClass(),
+					0,
+					null,
+					null);
+			}
+		} catch (InvalidTypesException e) {
+			if (allowMissing) {
+				return (TypeInformation<T>) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e);
+			} else {
+				throw e;
+			}
+		}
 	}
-	
-	
+
+
 	@SuppressWarnings("unchecked")
 	@PublicEvolving
 	public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
@@ -354,49 +521,21 @@ public class TypeExtractor {
 	/**
 	 * Returns the unary operator's return type.
 	 *
-	 * @param function Function to extract the return type from
-	 * @param baseClass Base class of the function
-	 * @param hasIterable True if the first function parameter is an iterable, otherwise false
-	 * @param hasCollector True if the function has an additional collector parameter, otherwise false
-	 * @param inType Type of the input elements (In case of an iterable, it is the element type)
-	 * @param functionName Function name
-	 * @param allowMissing Can the type information be missing
-	 * @param <IN> Input type
-	 * @param <OUT> Output type
-	 * @return TypeInformation of the return type of the function
-	 */
-	@SuppressWarnings("unchecked")
-	@PublicEvolving
-	public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
-		Function function,
-		Class<?> baseClass,
-		boolean hasIterable,
-		boolean hasCollector,
-		TypeInformation<IN> inType,
-		String functionName,
-		boolean allowMissing) {
-
-		return getUnaryOperatorReturnType(
-			function,
-			baseClass,
-			hasIterable ? 0 : -1,
-			hasCollector ? 0 : -1,
-			inType,
-			functionName,
-			allowMissing);
-	}
-
-	/**
-	 * Returns the unary operator's return type.
+	 * <p><b>NOTE:</b> lambda type indices allow extraction of Type from lambdas. To extract input type <b>IN</b>
+	 * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices.
+	 *
+	 * <pre>
+	 * <code>
+	 * OUT apply(Map<String, List<IN>> value)
+	 * </code>
+	 * </pre>
 	 *
 	 * @param function Function to extract the return type from
 	 * @param baseClass Base class of the function
-	 * @param inputTypeArgumentIndex Index of the type argument of function's first parameter
-	 *                               specifying the input type if it is wrapped (Iterable, Map,
-	 *                               etc.). Otherwise -1.
-	 * @param outputTypeArgumentIndex Index of the type argument of function's second parameter
-	 *                                specifying the output type if it is wrapped in a Collector.
-	 *                                Otherwise -1.
+	 * @param inputTypeArgumentIndex Index of input type in the class specification
+	 * @param outputTypeArgumentIndex Index of output type in the class specification
+	 * @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
+	 * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
 	 * @param inType Type of the input elements (In case of an iterable, it is the element type)
 	 * @param functionName Function name
 	 * @param allowMissing Can the type information be missing
@@ -411,6 +550,8 @@ public class TypeExtractor {
 		Class<?> baseClass,
 		int inputTypeArgumentIndex,
 		int outputTypeArgumentIndex,
+		int[] lambdaInputTypeArgumentIndices,
+		int[] lambdaOutputTypeArgumentIndices,
 		TypeInformation<IN> inType,
 		String functionName,
 		boolean allowMissing) {
@@ -422,37 +563,63 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Internal error occurred.", e);
 			}
 			if (exec != null) {
+				Preconditions.checkArgument(
+					lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
+					"Indices for input type arguments within lambda not provided");
+				Preconditions.checkArgument(
+					lambdaOutputTypeArgumentIndices != null,
+					"Indices for output type arguments within lambda not provided");
 				// check for lambda type erasure
 				validateLambdaGenericParameters(exec);
 
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-				final int paramLen = exec.getParameterTypes().length - 1;
+				// paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
+				final int paramLen = exec.getParameterTypes().length;
+
+				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+
+				// number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range
+				final int baseParametersLen = sam.getParameterTypes().length;
 
 				// executable references "this" implicitly
-				if (paramLen < 0) {
+				if (paramLen <= 0) {
 					// executable declaring class can also be a super class of the input type
 					// we only validate if the executable exists in input type
 					validateInputContainsExecutable(exec, inType);
 				}
 				else {
-					final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
-					validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType);
+					final Type input = TypeExtractionUtils.extractTypeFromLambda(
+						exec,
+						lambdaInputTypeArgumentIndices,
+						paramLen,
+						baseParametersLen);
+					validateInputType(input, inType);
 				}
 
 				if (function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
-				return new TypeExtractor().privateCreateTypeInfo(
-					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
-					inType,
-					null);
-			}
-			else {
-				validateInputType(baseClass, function.getClass(), 0, inType);
+
+				final Type output;
+				if (lambdaOutputTypeArgumentIndices.length > 0) {
+					output = TypeExtractionUtils.extractTypeFromLambda(
+						exec,
+						lambdaOutputTypeArgumentIndices,
+						paramLen,
+						baseParametersLen);
+				} else {
+					output = exec.getReturnType();
+				}
+
+				return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
+			} else {
+				Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
+				Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+				validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
 				if(function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
-				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null);
+				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null);
 			}
 		}
 		catch (InvalidTypesException e) {
@@ -467,54 +634,23 @@ public class TypeExtractor {
 	/**
 	 * Returns the binary operator's return type.
 	 *
-	 * @param function Function to extract the return type from
-	 * @param baseClass Base class of the function
-	 * @param hasIterables True if the first function parameter is an iterable, otherwise false
-	 * @param hasCollector True if the function has an additional collector parameter, otherwise false
-	 * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
-	 * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
-	 * @param functionName Function name
-	 * @param allowMissing Can the type information be missing
-	 * @param <IN1> Left side input type
-	 * @param <IN2> Right side input type
-	 * @param <OUT> Output type
-	 * @return TypeInformation of the return type of the function
-	 */
-	@SuppressWarnings("unchecked")
-	@PublicEvolving
-	public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
-		Function function,
-		Class<?> baseClass,
-		boolean hasIterables,
-		boolean hasCollector,
-		TypeInformation<IN1> in1Type,
-		TypeInformation<IN2> in2Type,
-		String functionName,
-		boolean allowMissing) {
-
-		return getBinaryOperatorReturnType(
-			function,
-			baseClass,
-			hasIterables ? 0 : -1,
-			hasCollector ? 0 : -1,
-			in1Type,
-			in2Type,
-			functionName,
-			allowMissing
-		);
-	}
-
-	/**
-	 * Returns the binary operator's return type.
+	 * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN1</b>
+	 * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices.
+	 *
+	 * <pre>
+	 * <code>
+	 * OUT apply(Map<String, List<IN1>> value1, List<IN2> value2)
+	 * </code>
+	 * </pre>
 	 *
 	 * @param function Function to extract the return type from
 	 * @param baseClass Base class of the function
-	 * @param inputTypeArgumentIndex Index of the type argument of function's first parameter
-	 *                               specifying the input type if it is wrapped (Iterable, Map,
-	 *                               etc.). Otherwise -1.
-	 * @param outputTypeArgumentIndex Index of the type argument of functions second parameter
-	 *                                specifying the output type if it is wrapped in a Collector.
-	 *                                Otherwise -1.
+	 * @param input1TypeArgumentIndex Index of first input type in the class specification
+	 * @param input2TypeArgumentIndex Index of second input type in the class specification
+	 * @param outputTypeArgumentIndex Index of output type in the class specification
+	 * @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example.
+	 * @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example.
+	 * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
 	 * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
 	 * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
 	 * @param functionName Function name
@@ -529,8 +665,12 @@ public class TypeExtractor {
 	public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
 		Function function,
 		Class<?> baseClass,
-		int inputTypeArgumentIndex,
+		int input1TypeArgumentIndex,
+		int input2TypeArgumentIndex,
 		int outputTypeArgumentIndex,
+		int[] lambdaInput1TypeArgumentIndices,
+		int[] lambdaInput2TypeArgumentIndices,
+		int[] lambdaOutputTypeArgumentIndices,
 		TypeInformation<IN1> in1Type,
 		TypeInformation<IN2> in2Type,
 		String functionName,
@@ -543,30 +683,67 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Internal error occurred.", e);
 			}
 			if (exec != null) {
+				Preconditions.checkArgument(
+					lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
+					"Indices for first input type arguments within lambda not provided");
+				Preconditions.checkArgument(
+					lambdaInput2TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
+					"Indices for second input type arguments within lambda not provided");
+				Preconditions.checkArgument(
+					lambdaOutputTypeArgumentIndices != null,
+					"Indices for output type arguments within lambda not provided");
 				// check for lambda type erasure
 				validateLambdaGenericParameters(exec);
-				
+
+				final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+				final int baseParametersLen = sam.getParameterTypes().length;
+
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-				final int paramLen = exec.getParameterTypes().length - 1;
-				final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1];
-				final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
-				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type);
-				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type);
+				final int paramLen = exec.getParameterTypes().length;
+
+				final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
+					exec,
+					lambdaInput1TypeArgumentIndices,
+					paramLen,
+					baseParametersLen);
+				final Type input2 = TypeExtractionUtils.extractTypeFromLambda(
+					exec,
+					lambdaInput2TypeArgumentIndices,
+					paramLen,
+					baseParametersLen);
+
+				validateInputType(input1, in1Type);
+				validateInputType(input2, in2Type);
 				if(function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
+
+				final Type output;
+				if (lambdaOutputTypeArgumentIndices.length > 0) {
+					output = TypeExtractionUtils.extractTypeFromLambda(
+						exec,
+						lambdaOutputTypeArgumentIndices,
+						paramLen,
+						baseParametersLen);
+				} else {
+					output = exec.getReturnType();
+				}
+
 				return new TypeExtractor().privateCreateTypeInfo(
-					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
+					output,
 					in1Type,
 					in2Type);
 			}
 			else {
-				validateInputType(baseClass, function.getClass(), 0, in1Type);
-				validateInputType(baseClass, function.getClass(), 1, in2Type);
+				Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
+				Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
+				Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+				validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
+				validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
 				if(function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
-				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type);
+				return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type);
 			}
 		}
 		catch (InvalidTypesException e) {
@@ -577,7 +754,7 @@ public class TypeExtractor {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Create type information
 	// --------------------------------------------------------------------------------------------
@@ -586,7 +763,7 @@ public class TypeExtractor {
 	public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
 		return (TypeInformation<T>) createTypeInfo((Type) type);
 	}
-	
+
 	public static TypeInformation<?> createTypeInfo(Type t) {
 		TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t);
 		if (ti == null) {
@@ -628,46 +805,46 @@ public class TypeExtractor {
 		}
 		return ti;
 	}
-	
+
 	// ----------------------------------- private methods ----------------------------------------
-	
+
 	private TypeInformation<?> privateCreateTypeInfo(Type t) {
 		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 		typeHierarchy.add(t);
 		return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
 	}
-	
+
 	// for (Rich)Functions
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 		Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
-		
+
 		TypeInformation<OUT> typeInfo;
-		
+
 		// return type is a variable -> try to get the type info from the input directly
 		if (returnType instanceof TypeVariable<?>) {
 			typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) returnType, typeHierarchy, in1Type, in2Type);
-			
+
 			if (typeInfo != null) {
 				return typeInfo;
 			}
 		}
-		
+
 		// get info from hierarchy
 		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
-	
+
 	// for LambdaFunctions
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
-		
+
 		// get info from hierarchy
 		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
-	
+
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
@@ -680,29 +857,29 @@ public class TypeExtractor {
 		// check if type is a subclass of tuple
 		else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
 			Type curT = t;
-			
+
 			// do not allow usage of Tuple as type
 			if (typeToClass(t).equals(Tuple.class)) {
 				throw new InvalidTypesException(
 						"Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.");
 			}
-						
+
 			// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
-			// collect the types while moving up for a later top-down 
+			// collect the types while moving up for a later top-down
 			while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) {
 				typeHierarchy.add(curT);
 				curT = typeToClass(curT).getGenericSuperclass();
 			}
-			
+
 			if(curT == Tuple0.class) {
 				return new TupleTypeInfo(Tuple0.class);
 			}
-			
+
 			// check if immediate child of Tuple has generics
 			if (curT instanceof Class<?>) {
 				throw new InvalidTypesException("Tuple needs to be parameterized by using generics.");
 			}
-			
+
 			typeHierarchy.add(curT);
 
 			// create the type information for the subtypes
@@ -718,13 +895,13 @@ public class TypeExtractor {
 			}
 			// return tuple info
 			return new TupleTypeInfo(typeToClass(t), subTypesInfo);
-			
+
 		}
 		// type depends on another type
 		// e.g. class MyMapper<E> extends MapFunction<String, E>
 		else if (t instanceof TypeVariable) {
 			Type typeVar = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) t);
-			
+
 			if (!(typeVar instanceof TypeVariable)) {
 				return createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type);
 			}
@@ -741,12 +918,12 @@ public class TypeExtractor {
 				}
 			}
 		}
-		// arrays with generics 
+		// arrays with generics
 		else if (t instanceof GenericArrayType) {
 			GenericArrayType genericArray = (GenericArrayType) t;
-			
+
 			Type componentType = genericArray.getGenericComponentType();
-			
+
 			// due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class
 			if (componentType instanceof Class) {
 				Class<?> componentClass = (Class<?>) componentType;
@@ -775,11 +952,11 @@ public class TypeExtractor {
 		else if (t instanceof Class) {
 			return privateGetForClass((Class<OUT>) t, typeHierarchy);
 		}
-		
+
 		throw new InvalidTypesException("Type Information could not be created.");
 	}
 
-	private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, 
+	private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
 			TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
 
 		Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar);
@@ -791,12 +968,12 @@ public class TypeExtractor {
 		else {
 			returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
 		}
-		
+
 		// no input information exists
 		if (in1TypeInfo == null && in2TypeInfo == null) {
 			return null;
 		}
-		
+
 		// create a new type hierarchy for the input
 		ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
 		// copy the function part of the type hierarchy
@@ -809,7 +986,7 @@ public class TypeExtractor {
 			}
 		}
 		ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1);
-		
+
 		TypeInformation<?> info = null;
 		if (in1TypeInfo != null) {
 			// find the deepest type variable that describes the type of input 1
@@ -898,18 +1075,18 @@ public class TypeExtractor {
 		// the input is a tuple
 		else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) {
 			ParameterizedType tupleBaseClass;
-			
+
 			// get tuple from possible tuple subclass
 			while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) {
 				inputTypeHierarchy.add(inType);
 				inType = typeToClass(inType).getGenericSuperclass();
 			}
 			inputTypeHierarchy.add(inType);
-			
+
 			// we can assume to be parameterized since we
 			// already did input validation
 			tupleBaseClass = (ParameterizedType) inType;
-			
+
 			Type[] tupleElements = tupleBaseClass.getActualTypeArguments();
 			// go thru all tuple elements and search for type variables
 			for (int i = 0; i < tupleElements.length; i++) {
@@ -1068,13 +1245,13 @@ public class TypeExtractor {
 	public static Type getParameterType(Class<?> baseClass, Class<?> clazz, int pos) {
 		return getParameterType(baseClass, null, clazz, pos);
 	}
-	
+
 	private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) {
 		if (typeHierarchy != null) {
 			typeHierarchy.add(clazz);
 		}
 		Type[] interfaceTypes = clazz.getGenericInterfaces();
-		
+
 		// search in interfaces for base class
 		for (Type t : interfaceTypes) {
 			Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
@@ -1082,18 +1259,18 @@ public class TypeExtractor {
 				return parameter;
 			}
 		}
-		
+
 		// search in superclass for base class
 		Type t = clazz.getGenericSuperclass();
 		Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
 		if (parameter != null) {
 			return parameter;
 		}
-		
-		throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " + 
+
+		throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " +
 						"Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
 	}
-	
+
 	private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {
 		// base class
 		if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) {
@@ -1109,7 +1286,7 @@ public class TypeExtractor {
 				typeHierarchy.add(t);
 			}
 			return getParameterType(baseClass, typeHierarchy, (Class<?>) ((ParameterizedType) t).getRawType(), pos);
-		}			
+		}
 		else if (t instanceof Class<?> && baseClass.isAssignableFrom((Class<?>) t)) {
 			if (typeHierarchy != null) {
 				typeHierarchy.add(t);
@@ -1118,11 +1295,11 @@ public class TypeExtractor {
 		}
 		return null;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Validate input
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static void validateInputType(Type t, TypeInformation<?> inType) {
 		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 		try {
@@ -1132,7 +1309,7 @@ public class TypeExtractor {
 			throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
 		}
 	}
-	
+
 	private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
 		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
 
@@ -1152,21 +1329,21 @@ public class TypeExtractor {
 			throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) {
 		if (type == null) {
 			throw new InvalidTypesException("Unknown Error. Type is null.");
 		}
-		
+
 		if (typeInfo == null) {
 			throw new InvalidTypesException("Unknown Error. TypeInformation is null.");
 		}
-		
+
 		if (!(type instanceof TypeVariable<?>)) {
 			// check for Java Basic Types
 			if (typeInfo instanceof BasicTypeInfo) {
-				
+
 				TypeInformation<?> actual;
 				// check if basic type at all
 				if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
@@ -1176,7 +1353,7 @@ public class TypeExtractor {
 				if (!typeInfo.equals(actual)) {
 					throw new InvalidTypesException("Basic type '" + typeInfo + "' expected but was '" + actual + "'.");
 				}
-				
+
 			}
 			// check for Java SQL time types
 			else if (typeInfo instanceof SqlTimeTypeInfo) {
@@ -1198,36 +1375,36 @@ public class TypeExtractor {
 				if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) {
 					throw new InvalidTypesException("Tuple type expected.");
 				}
-				
+
 				// do not allow usage of Tuple as type
 				if (isClassType(type) && typeToClass(type).equals(Tuple.class)) {
 					throw new InvalidTypesException("Concrete subclass of Tuple expected.");
 				}
-				
+
 				// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
 				while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) {
 					typeHierarchy.add(type);
 					type = typeToClass(type).getGenericSuperclass();
 				}
-				
+
 				if(type == Tuple0.class) {
 					return;
 				}
-				
+
 				// check if immediate child of Tuple has generics
 				if (type instanceof Class<?>) {
 					throw new InvalidTypesException("Parameterized Tuple type expected.");
 				}
-				
+
 				TupleTypeInfo<?> tti = (TupleTypeInfo<?>) typeInfo;
-				
+
 				Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
-				
+
 				if (subTypes.length != tti.getArity()) {
 					throw new InvalidTypesException("Tuple arity '" + tti.getArity() + "' expected but was '"
 							+ subTypes.length + "'.");
 				}
-				
+
 				for (int i = 0; i < subTypes.length; i++) {
 					validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
 				}
@@ -1258,16 +1435,16 @@ public class TypeExtractor {
 						&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
 					throw new InvalidTypesException("Array type expected.");
 				}
-				
+
 				if (component instanceof TypeVariable<?>) {
 					component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
 					if (component instanceof TypeVariable) {
 						return;
 					}
 				}
-				
+
 				validateInfo(typeHierarchy, component, ((BasicArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
-				
+
 			}
 			// check for object array
 			else if (typeInfo instanceof ObjectArrayTypeInfo<?, ?>) {
@@ -1275,7 +1452,7 @@ public class TypeExtractor {
 				if (!(type instanceof Class<?> && ((Class<?>) type).isArray()) && !(type instanceof GenericArrayType)) {
 					throw new InvalidTypesException("Object array type expected.");
 				}
-				
+
 				// check component
 				Type component;
 				if (type instanceof Class<?>) {
@@ -1283,14 +1460,14 @@ public class TypeExtractor {
 				} else {
 					component = ((GenericArrayType) type).getGenericComponentType();
 				}
-				
+
 				if (component instanceof TypeVariable<?>) {
 					component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
 					if (component instanceof TypeVariable) {
 						return;
 					}
 				}
-				
+
 				validateInfo(typeHierarchy, component, ((ObjectArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
 			}
 			// check for value
@@ -1299,7 +1476,7 @@ public class TypeExtractor {
 				if (!(type instanceof Class<?> && Value.class.isAssignableFrom((Class<?>) type))) {
 					throw new InvalidTypesException("Value type expected.");
 				}
-				
+
 				TypeInformation<?> actual;
 				// check value type contents
 				if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
@@ -1456,33 +1633,6 @@ public class TypeExtractor {
 		return fieldCount;
 	}
 
-	/**
-	 * * This method extracts the n-th type argument from the given type. An InvalidTypesException
-	 * is thrown if the type does not have any type arguments or if the index exceeds the number
-	 * of type arguments.
-	 *
-	 * @param t Type to extract the type arguments from
-	 * @param index Index of the type argument to extract
-	 * @return The extracted type argument
-	 * @throws InvalidTypesException if the given type does not have any type arguments or if the
-	 * index exceeds the number of type arguments.
-	 */
-	private static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
-		if(t instanceof ParameterizedType) {
-			Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
-
-			if (index < 0 || index >= actualTypeArguments.length) {
-				throw new InvalidTypesException("Cannot extract the type argument with index " +
-					index + " because the type has only " + actualTypeArguments.length +
-					" type arguments.");
-			} else {
-				return actualTypeArguments[index];
-			}
-		} else {
-			throw new InvalidTypesException("The given type " + t + " is not a parameterized type.");
-		}
-	}
-	
 	private static void validateLambdaGenericParameters(LambdaExecutable exec) {
 		// check the arguments
 		for (Type t : exec.getParameterTypes()) {
@@ -1516,19 +1666,19 @@ public class TypeExtractor {
 		// iterate thru hierarchy from top to bottom until type variable gets a class assigned
 		for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
 			Type curT = typeHierarchy.get(i);
-			
+
 			// parameterized type
 			if (curT instanceof ParameterizedType) {
 				Class<?> rawType = ((Class<?>) ((ParameterizedType) curT).getRawType());
-				
+
 				for (int paramIndex = 0; paramIndex < rawType.getTypeParameters().length; paramIndex++) {
-					
+
 					TypeVariable<?> curVarOfCurT = rawType.getTypeParameters()[paramIndex];
-					
+
 					// check if variable names match
 					if (sameTypeVars(curVarOfCurT, inTypeTypeVar)) {
 						Type curVarType = ((ParameterizedType) curT).getActualTypeArguments()[paramIndex];
-						
+
 						// another type variable level
 						if (curVarType instanceof TypeVariable<?>) {
 							inTypeTypeVar = (TypeVariable<?>) curVarType;
@@ -1545,14 +1695,14 @@ public class TypeExtractor {
 		// return the type variable of the deepest level
 		return inTypeTypeVar;
 	}
-	
+
 	/**
 	 * Creates type information from a given Class such as Integer, String[] or POJOs.
-	 * 
-	 * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies. 
+	 *
+	 * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies.
 	 * In most cases {@link TypeExtractor#createTypeInfo(Type)} is the recommended method for type extraction
 	 * (a Class is a child of Type).
-	 * 
+	 *
 	 * @param clazz a Class to create TypeInformation for
 	 * @return TypeInformation that describes the passed Class
 	 */
@@ -1561,7 +1711,7 @@ public class TypeExtractor {
 		typeHierarchy.add(clazz);
 		return new TypeExtractor().privateGetForClass(clazz, typeHierarchy);
 	}
-	
+
 	private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
 		return privateGetForClass(clazz, typeHierarchy, null, null, null);
 	}
@@ -1600,13 +1750,13 @@ public class TypeExtractor {
 			if (primitiveArrayInfo != null) {
 				return primitiveArrayInfo;
 			}
-			
+
 			// basic type arrays: String[], Integer[], Double[]
 			BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz);
 			if (basicArrayInfo != null) {
 				return basicArrayInfo;
 			}
-			
+
 			// object arrays
 			else {
 				TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy(
@@ -1618,7 +1768,7 @@ public class TypeExtractor {
 				return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
 			}
 		}
-		
+
 		// check for writable types
 		if (isHadoopWritable(clazz)) {
 			return createHadoopWritableTypeInfo(clazz);
@@ -1635,13 +1785,13 @@ public class TypeExtractor {
 		if (timeTypeInfo != null) {
 			return timeTypeInfo;
 		}
-		
+
 		// check for subclasses of Value
 		if (Value.class.isAssignableFrom(clazz)) {
 			Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
 			return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass);
 		}
-		
+
 		// check for subclasses of Tuple
 		if (Tuple.class.isAssignableFrom(clazz)) {
 			if(clazz == Tuple0.class) {
@@ -1680,13 +1830,13 @@ public class TypeExtractor {
 		// return a generic type
 		return new GenericTypeInfo<OUT>(clazz);
 	}
-	
+
 	/**
 	 * Checks if the given field is a valid pojo field:
 	 * - it is public
 	 * OR
 	 *  - there are getter and setter methods for the field.
-	 *  
+	 *
 	 * @param f field to check
 	 * @param clazz class of field
 	 * @param typeHierarchy type hierarchy for materializing generic types
@@ -1753,7 +1903,7 @@ public class TypeExtractor {
 			LOG.info("Class " + clazz.getName() + " is not public, cannot treat it as a POJO type. Will be handled as GenericType");
 			return new GenericTypeInfo<OUT>(clazz);
 		}
-		
+
 		// add the hierarchy of the POJO itself if it is generic
 		if (parameterizedType != null) {
 			getTypeHierarchy(typeHierarchy, parameterizedType, Object.class);
@@ -1762,7 +1912,7 @@ public class TypeExtractor {
 		else if (typeHierarchy.size() <= 1) {
 			getTypeHierarchy(typeHierarchy, clazz, Object.class);
 		}
-		
+
 		List<Field> fields = getAllDeclaredFields(clazz, false);
 		if (fields.size() == 0) {
 			LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType");
@@ -1822,7 +1972,7 @@ public class TypeExtractor {
 			LOG.info("The default constructor of " + clazz + " should be Public to be used as a POJO.");
 			return null;
 		}
-		
+
 		// everything is checked, we return the pojo
 		return pojoType;
 	}
@@ -1870,7 +2020,7 @@ public class TypeExtractor {
 		}
 		return null;
 	}
-	
+
 	private static boolean hasFieldWithSameName(String name, List<Field> fields) {
 		for(Field field : fields) {
 			if(name.equals(field.getName())) {
@@ -1879,7 +2029,7 @@ public class TypeExtractor {
 		}
 		return false;
 	}
-	
+
 	private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {
 		for (int j = 0; j < pojoInfo.getArity(); j++) {
 			PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j);
@@ -1911,20 +2061,20 @@ public class TypeExtractor {
 			Tuple t = (Tuple) value;
 			int numFields = t.getArity();
 			if(numFields != countFieldsInClass(value.getClass())) {
-				// not a tuple since it has more fields. 
+				// not a tuple since it has more fields.
 				return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because
 				// there is currently no other type that can handle such a class.
 			}
-			
+
 			TypeInformation<?>[] infos = new TypeInformation[numFields];
 			for (int i = 0; i < numFields; i++) {
 				Object field = t.getField(i);
-				
+
 				if (field == null) {
 					throw new InvalidTypesException("Automatic type extraction is not possible on candidates with null values. "
 							+ "Please specify the types directly.");
 				}
-				
+
 				infos[i] = privateGetForObject(field);
 			}
 			return new TupleTypeInfo(value.getClass(), infos);
@@ -2013,10 +2163,10 @@ public class TypeExtractor {
 	static void validateIfWritable(TypeInformation<?> typeInfo, Type type) {
 		try {
 			// try to load the writable type info
-			
+
 			Class<?> writableTypeInfoClass = Class
 					.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, typeInfo.getClass().getClassLoader());
-			
+
 			if (writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) {
 				// this is actually a writable type info
 				// check if the type is a writable

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64ff605..7500d73 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -261,6 +262,18 @@ public class LambdaExtractionTest {
 		Assert.assertTrue(ti instanceof MissingTypeInfo);
 	}
 
+	@Test
+	public void testPartitionerLambda() {
+		Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
+		final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
+
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(2, ti.getArity());
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
+		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+
+	}
+
 	private static class MyType {
 		private int key;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 4eff037..37bf872 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -53,7 +53,6 @@ public class CEPLambdaTest extends TestLogger {
 	 * Tests that a Java8 lambda can be passed as a CEP select function.
 	 */
 	@Test
-	@Ignore
 	public void testLambdaSelectFunction() {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
@@ -81,7 +80,6 @@ public class CEPLambdaTest extends TestLogger {
 	 * Tests that a Java8 lambda can be passed as a CEP flat select function.
 	 */
 	@Test
-	@Ignore
 	public void testLambdaFlatSelectFunction() {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 71614cf..3131a94 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -83,8 +83,10 @@ public class PatternStream<T> {
 		TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
+			0,
 			1,
-			-1,
+			new int[]{0, 1, 0},
+			new int[]{},
 			inputStream.getType(),
 			null,
 			false);
@@ -142,8 +144,10 @@ public class PatternStream<T> {
 		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternTimeoutFunction,
 			PatternTimeoutFunction.class,
+			0,
 			1,
-			-1,
+			new int[]{0, 1, 0},
+			new int[]{},
 			inputStream.getType(),
 			null,
 			false);
@@ -151,8 +155,10 @@ public class PatternStream<T> {
 		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
+			0,
 			1,
-			-1,
+			new int[]{0, 1, 0},
+			new int[]{},
 			inputStream.getType(),
 			null,
 			false);
@@ -184,8 +190,10 @@ public class PatternStream<T> {
 		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
-			1,
 			0,
+			1,
+			new int[] {0, 1, 0},
+			new int[] {1, 0},
 			inputStream.getType(),
 			null,
 			false);
@@ -244,8 +252,10 @@ public class PatternStream<T> {
 		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatTimeoutFunction,
 			PatternFlatTimeoutFunction.class,
+			0,
 			1,
-			-1,
+			new int[]{0, 1, 0},
+			new int[]{2, 0},
 			inputStream.getType(),
 			null,
 			false);
@@ -253,8 +263,10 @@ public class PatternStream<T> {
 		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
+			0,
 			1,
-			-1,
+			new int[]{0, 1, 0},
+			new int[]{1, 0},
 			inputStream.getType(),
 			null,
 			false);

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 69ba42f..950b5da 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -232,7 +232,7 @@ public class CEPMigration11to13Test {
 		NullByteKeySelector keySelector = new NullByteKeySelector();
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
-				new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
 						new KeyedCEPPatternOperator<>(
 								Event.createTypeSerializer(),
 								false,
@@ -284,7 +284,7 @@ public class CEPMigration11to13Test {
 			OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
 			harness.close();
 
-			harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			harness = new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
 				new KeyedCEPPatternOperator<>(
 					Event.createTypeSerializer(),
 					false,

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 9c4f88e..4cb4e01 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -73,7 +73,16 @@ public class Translate {
 
 		Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>) (Class<? extends Vertex>) Vertex.class;
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(0);
-		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+			translator,
+			TranslateFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{1},
+			oldType,
+			null,
+			false);
 		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1);
 
 		TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
@@ -148,7 +157,16 @@ public class Translate {
 
 		Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>) (Class<? extends Edge>) Edge.class;
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(0);
-		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+			translator,
+			TranslateFunction.class,
+			0,
+			1,
+			new int[] {0},
+			new int[] {1},
+			oldType,
+			null,
+			false);
 		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(2);
 
 		TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
@@ -225,7 +243,16 @@ public class Translate {
 		Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>) (Class<? extends Vertex>) Vertex.class;
 		TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0);
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(1);
-		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+			translator,
+			TranslateFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{1},
+			oldType,
+			null,
+			false);
 
 		TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType);
 
@@ -300,7 +327,16 @@ public class Translate {
 		Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>) (Class<? extends Edge>) Edge.class;
 		TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0);
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(2);
-		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+			translator,
+			TranslateFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{1},
+			oldType,
+			null,
+			false);
 
 		TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31dbb4f..ae97109 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -236,8 +236,7 @@ public class AllWindowedStream<T, W extends Window> {
 			AllWindowFunction<T, R, W> function) {
 
 		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, AllWindowFunction.class, true, true, inType, null, false);
+		TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
 
 		return reduce(reduceFunction, function, resultType);
 	}
@@ -332,8 +331,7 @@ public class AllWindowedStream<T, W extends Window> {
 			ReduceFunction<T> reduceFunction,
 			ProcessAllWindowFunction<T, R, W> function) {
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false);
+		TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, input.getType());
 
 		return reduce(reduceFunction, function, resultType);
 	}
@@ -507,12 +505,41 @@ public class AllWindowedStream<T, W extends Window> {
 		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
 				aggFunction, input.getType(), null, false);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
+		TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);
 
 		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
 	}
 
+	private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
+			AllWindowFunction<IN, OUT, ?> function,
+			TypeInformation<IN> inType) {
+		return TypeExtractor.getUnaryOperatorReturnType(
+			function,
+			AllWindowFunction.class,
+			0,
+			1,
+			new int[]{1, 0},
+			new int[]{2, 0},
+			inType,
+			null,
+			false);
+	}
+
+	private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
+			ProcessAllWindowFunction<IN, OUT, ?> function,
+			TypeInformation<IN> inType) {
+		return TypeExtractor.getUnaryOperatorReturnType(
+			function,
+			ProcessAllWindowFunction.class,
+			0,
+			1,
+			new int[]{1, 0},
+			new int[]{2, 0},
+			inType,
+			null,
+			false);
+	}
+
 	/**
 	 * Applies the given window function to each window. The window function is called for each
 	 * evaluation of the window for each key individually. The output of the window function is
@@ -642,8 +669,7 @@ public class AllWindowedStream<T, W extends Window> {
 		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
 				aggFunction, input.getType(), null, false);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false);
+		TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(windowFunction, aggResultType);
 
 		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
 	}
@@ -811,8 +837,7 @@ public class AllWindowedStream<T, W extends Window> {
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
 			Utils.getCallLocationName(), true);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, AllWindowFunction.class, true, true, foldAccumulatorType, null, false);
+		TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType);
 
 		return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
 	}
@@ -923,8 +948,7 @@ public class AllWindowedStream<T, W extends Window> {
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
 			Utils.getCallLocationName(), true);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false);
+		TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, foldAccumulatorType);
 
 		return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
 	}
@@ -1032,8 +1056,7 @@ public class AllWindowedStream<T, W extends Window> {
 	public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
 		String callLocation = Utils.getCallLocationName();
 		function = input.getExecutionEnvironment().clean(function);
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, AllWindowFunction.class, true, true, getInputType(), null, false);
+		TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
 		return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
 	}
 
@@ -1069,8 +1092,7 @@ public class AllWindowedStream<T, W extends Window> {
 	public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
 		String callLocation = Utils.getCallLocationName();
 		function = input.getExecutionEnvironment().clean(function);
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false);
+		TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, getInputType());
 		return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
 	}
 
@@ -1160,8 +1182,7 @@ public class AllWindowedStream<T, W extends Window> {
 	@Deprecated
 	public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
 		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, AllWindowFunction.class, true, true, inType, null, false);
+		TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
 
 		return apply(reduceFunction, function, resultType);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index 8461d2c..cb18a3f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -67,9 +67,16 @@ public class AsyncDataStream {
 			int bufSize,
 			OutputMode mode) {
 
-		TypeInformation<OUT> outTypeInfo =
-			TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, false,
-				true, in.getType(), Utils.getCallLocationName(), true);
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			func,
+			AsyncFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{1, 0},
+			in.getType(),
+			Utils.getCallLocationName(),
+			true);
 
 		// create transform
 		AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 8dad1cb..4bbb123 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -235,15 +235,12 @@ public class CoGroupedStreams<T1, T2> {
 		 */
 		public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
 
-			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
-					function,
-					CoGroupFunction.class,
-					true,
-					true,
-					input1.getType(),
-					input2.getType(),
-					"CoGroup",
-					false);
+			TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
+				function,
+				input1.getType(),
+				input2.getType(),
+				"CoGroup",
+				false);
 
 			return apply(function, resultType);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 0b882c8..e244bd2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -203,9 +203,19 @@ public class ConnectedStreams<IN1, IN2> {
 	 */
 	public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {
 
-		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
-				CoMapFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
+			coMapper,
+			CoMapFunction.class,
+			0,
+			1,
+			2,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			getType1(),
+			getType2(),
+			Utils.getCallLocationName(),
+			true);
 
 		return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
 
@@ -227,9 +237,19 @@ public class ConnectedStreams<IN1, IN2> {
 	public <R> SingleOutputStreamOperator<R> flatMap(
 			CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
 
-		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-				CoFlatMapFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
+			coFlatMapper,
+			CoFlatMapFunction.class,
+			0,
+			1,
+			2,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			getType1(),
+			getType2(),
+			Utils.getCallLocationName(),
+			true);
 
 		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
 	}
@@ -254,9 +274,19 @@ public class ConnectedStreams<IN1, IN2> {
 	public <R> SingleOutputStreamOperator<R> process(
 			CoProcessFunction<IN1, IN2, R> coProcessFunction) {
 
-		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
-				CoProcessFunction.class, false, true, getType1(), getType2(),
-				Utils.getCallLocationName(), true);
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
+			coProcessFunction,
+			CoProcessFunction.class,
+			0,
+			1,
+			2,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			TypeExtractor.NO_INDEX,
+			getType1(),
+			getType2(),
+			Utils.getCallLocationName(),
+			true);
 
 		return process(coProcessFunction, outTypeInfo);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0cdc9a1..66cd8e6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -574,13 +574,15 @@ public class DataStream<T> {
 	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
-				processFunction,
-				ProcessFunction.class,
-				false,
-				true,
-				getType(),
-				Utils.getCallLocationName(),
-				true);
+			processFunction,
+			ProcessFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{2, 0},
+			getType(),
+			Utils.getCallLocationName(),
+			true);
 
 		return process(processFunction, outType);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index e1ffe86..f23ebcf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -221,14 +221,18 @@ public class JoinedStreams<T1, T2> {
 		 */
 		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
 			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
-					function,
-					JoinFunction.class,
-					true,
-					true,
-					input1.getType(),
-					input2.getType(),
-					"Join",
-					false);
+				function,
+				JoinFunction.class,
+				0,
+				1,
+				2,
+				new int[]{0},
+				new int[]{1},
+				TypeExtractor.NO_INDEX,
+				input1.getType(),
+				input2.getType(),
+				"Join",
+				false);
 
 			return apply(function, resultType);
 		}
@@ -300,14 +304,18 @@ public class JoinedStreams<T1, T2> {
 		 */
 		public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
 			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
-					function,
-					FlatJoinFunction.class,
-					true,
-					true,
-					input1.getType(),
-					input2.getType(),
-					"Join",
-					false);
+				function,
+				FlatJoinFunction.class,
+				0,
+				1,
+				2,
+				new int[]{0},
+				new int[]{1},
+				new int[]{2, 0},
+				input1.getType(),
+				input2.getType(),
+				"Join",
+				false);
 
 			return apply(function, resultType);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 698deb8..851e614 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -267,13 +267,15 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
-				processFunction,
-				ProcessFunction.class,
-				false,
-				true,
-				getType(),
-				Utils.getCallLocationName(),
-				true);
+			processFunction,
+			ProcessFunction.class,
+			0,
+			1,
+			new int[]{0},
+			new int[]{2, 0},
+			getType(),
+			Utils.getCallLocationName(),
+			true);
 
 		return process(processFunction, outType);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index f8a1914..a795064 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -295,8 +295,7 @@ public class WindowedStream<T, K, W extends Window> {
 			LegacyWindowOperatorType legacyWindowOpType) {
 
 		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, WindowFunction.class, true, true, inType, null, false);
+		TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
 
 		return reduce(reduceFunction, function, resultType, legacyWindowOpType);
 	}
@@ -396,8 +395,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, ProcessWindowFunction.class, true, true, input.getType(), null, false);
+		TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, input.getType(), null);
 
 		return reduce(reduceFunction, function, resultType);
 	}
@@ -544,8 +542,7 @@ public class WindowedStream<T, K, W extends Window> {
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
 			Utils.getCallLocationName(), true);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, WindowFunction.class, true, true, foldAccumulatorType, null, false);
+		TypeInformation<R> resultType = getWindowFunctionReturnType(function, foldAccumulatorType);
 
 		return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
 	}
@@ -663,8 +660,7 @@ public class WindowedStream<T, K, W extends Window> {
 		TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
 				Utils.getCallLocationName(), true);
 
-		TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType(
-				windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false);
+		TypeInformation<R> windowResultType = getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName());
 
 		return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType);
 	}
@@ -852,8 +848,7 @@ public class WindowedStream<T, K, W extends Window> {
 		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
 				aggFunction, input.getType(), null, false);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				windowFunction, WindowFunction.class, true, true, aggResultType, null, false);
+		TypeInformation<R> resultType = getWindowFunctionReturnType(windowFunction, aggResultType);
 
 		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
 	}
@@ -981,12 +976,42 @@ public class WindowedStream<T, K, W extends Window> {
 		TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
 				aggFunction, input.getType(), null, false);
 
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
+		TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);
 
 		return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
 	}
 
+	private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType(
+		WindowFunction<IN, OUT, KEY, ?> function,
+		TypeInformation<IN> inType) {
+		return TypeExtractor.getUnaryOperatorReturnType(
+			function,
+			WindowFunction.class,
+			0,
+			1,
+			new int[]{2, 0},
+			new int[]{3, 0},
+			inType,
+			null,
+			false);
+	}
+
+	private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType(
+			ProcessWindowFunction<IN, OUT, KEY, ?> function,
+			TypeInformation<IN> inType,
+			String functionName) {
+		return TypeExtractor.getUnaryOperatorReturnType(
+			function,
+			ProcessWindowFunction.class,
+			0,
+			1,
+			new int[]{2, 0},
+			new int[]{3, 0},
+			inType,
+			functionName,
+			false);
+	}
+
 	/**
 	 * Applies the given window function to each window. The window function is called for each
 	 * evaluation of the window for each key individually. The output of the window function is
@@ -1094,8 +1119,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, WindowFunction.class, true, true, getInputType(), null, false);
+		TypeInformation<R> resultType = getWindowFunctionReturnType(function, getInputType());
 
 		return apply(function, resultType);
 	}
@@ -1131,8 +1155,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-			function, ProcessWindowFunction.class, true, true, getInputType(), null, false);
+		TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, getInputType(), null);
 
 		return process(function, resultType);
 	}
@@ -1231,8 +1254,7 @@ public class WindowedStream<T, K, W extends Window> {
 	@Deprecated
 	public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
 		TypeInformation<T> inType = input.getType();
-		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
-				function, WindowFunction.class, true, true, inType, null, false);
+		TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
 
 		return apply(reduceFunction, function, resultType);
 	}