You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/12 09:03:10 UTC

flink git commit: [FLINK-4793] [types] Improve lambda constructor reference handling

Repository: flink
Updated Branches:
  refs/heads/master 6731ec1e4 -> 1dda3ad00


[FLINK-4793] [types] Improve lambda constructor reference handling

This closes #2621.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dda3ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dda3ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dda3ad0

Branch: refs/heads/master
Commit: 1dda3ad009667697a620359e997e83a5ba2447dd
Parents: 6731ec1
Author: twalthr <tw...@apache.org>
Authored: Tue Oct 11 15:33:20 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Wed Oct 12 11:02:45 2016 +0200

----------------------------------------------------------------------
 flink-core/pom.xml                              |   7 +
 .../common/functions/util/FunctionUtils.java    |  69 --------
 .../java/typeutils/TypeExtractionException.java |  57 +++++++
 .../api/java/typeutils/TypeExtractionUtils.java | 167 +++++++++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java |  80 ++++-----
 .../java/type/lambdas/LambdaExtractionTest.java |  27 +--
 .../javaApiOperators/lambdas/MapITCase.java     |  20 ++-
 7 files changed, 306 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 865a253..cfa2cbb 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,6 +80,13 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- ASM is needed for type extraction -->
+		<dependency>
+			<groupId>org.ow2.asm</groupId>
+			<artifactId>asm-all</artifactId>
+			<version>${asm.version}</version>
+		</dependency>
+
 		<!-- test dependencies -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index ffd885b..2bb1cb3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.lang.reflect.Method;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFunction;
@@ -62,73 +60,6 @@ public final class FunctionUtils {
 			return defaultContext;
 		}
 	}
-	
-	public static Method checkAndExtractLambdaMethod(Function function) {
-		try {
-			// get serialized lambda
-			Object serializedLambda = null;
-			for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
-				try {
-					Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
-					replaceMethod.setAccessible(true);
-					Object serialVersion = replaceMethod.invoke(function);
-
-					// check if class is a lambda function
-					if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
-
-						// check if SerializedLambda class is present
-						try {
-							Class.forName("java.lang.invoke.SerializedLambda");
-						}
-						catch (Exception e) {
-							throw new UnsupportedOperationException("User code tries to use lambdas, but framework is running with a Java version < 8");
-						}
-						serializedLambda = serialVersion;
-						break;
-					}
-				}
-				catch (NoSuchMethodException e) {
-					// thrown if the method is not there. fall through the loop
-				}
-			}
-
-			// not a lambda method -> return null
-			if (serializedLambda == null) {
-				return null;
-			}
-
-			// find lambda method
-			Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass");
-			Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
-
-			String className = (String) implClassMethod.invoke(serializedLambda);
-			String methodName = (String) implMethodNameMethod.invoke(serializedLambda);
-
-			Class<?> implClass = Class.forName(className.replace('/', '.'), true, Thread.currentThread().getContextClassLoader());
-
-			Method[] methods = implClass.getDeclaredMethods();
-			Method parameterizedMethod = null;
-			for (Method method : methods) {
-				if(method.getName().equals(methodName)) {
-					if(parameterizedMethod != null) {
-						// It is very unlikely that a class contains multiple e.g. "lambda$2()" but its possible
-						// Actually, the signature need to be checked, but this is very complex
-						throw new Exception("Lambda method name is not unique.");
-					}
-					else {
-						parameterizedMethod = method;
-					}
-				}
-			}
-			if (parameterizedMethod == null) {
-				throw new Exception("No lambda method found.");
-			}
-			return parameterizedMethod;
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Could not extract lambda method out of function: " + e.getClass().getSimpleName() + " - " + e.getMessage(), e);
-		}
-	}
 
 	/**
 	 * Private constructor to prevent instantiation.

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
new file mode 100644
index 0000000..0dad55d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Type extraction always contains some uncertainty due to unpredictable JVM differences
+ * between vendors or versions. This exception is thrown if an assumption failed during extraction.
+ */
+@Internal
+public class TypeExtractionException extends Exception {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new exception with no message.
+	 */
+	public TypeExtractionException() {
+		super();
+	}
+
+	/**
+	 * Creates a new exception with the given message.
+	 *
+	 * @param message The exception message.
+	 */
+	public TypeExtractionException(String message) {
+		super(message);
+	}
+
+	/**
+	 * Creates a new exception with the given message and cause.
+	 *
+	 * @param message The exception message.
+	 * @param e cause
+	 */
+	public TypeExtractionException(String message, Throwable e) {
+		super(message, e);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
new file mode 100644
index 0000000..4439612
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Function;
+import static org.objectweb.asm.Type.getConstructorDescriptor;
+import static org.objectweb.asm.Type.getMethodDescriptor;
+
+@Internal
+public class TypeExtractionUtils {
+
+	private TypeExtractionUtils() {
+		// do not allow instantiation
+	}
+
+	/**
+	 * Similar to a Java 8 Executable but with a return type.
+	 */
+	public static class LambdaExecutable {
+
+		private Type[] parameterTypes;
+		private Type returnType;
+		private String name;
+		private Object executable;
+
+		public LambdaExecutable(Constructor<?> constructor) {
+			this.parameterTypes = constructor.getGenericParameterTypes();
+			this.returnType = constructor.getDeclaringClass();
+			this.name = constructor.getName();
+			this.executable = constructor;
+		}
+
+		public LambdaExecutable(Method method) {
+			this.parameterTypes = method.getGenericParameterTypes();
+			this.returnType = method.getGenericReturnType();
+			this.name = method.getName();
+			this.executable = method;
+		}
+
+		public Type[] getParameterTypes() {
+			return parameterTypes;
+		}
+
+		public Type getReturnType() {
+			return returnType;
+		}
+
+		public String getName() {
+			return name;
+		}
+
+		public boolean executablesEquals(Method m) {
+			return executable.equals(m);
+		}
+
+		public boolean executablesEquals(Constructor<?> c) {
+			return executable.equals(c);
+		}
+	}
+
+	public static LambdaExecutable checkAndExtractLambda(Function function) throws TypeExtractionException {
+		try {
+			// get serialized lambda
+			Object serializedLambda = null;
+			for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
+				try {
+					Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
+					replaceMethod.setAccessible(true);
+					Object serialVersion = replaceMethod.invoke(function);
+
+					// check if class is a lambda function
+					if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
+
+						// check if SerializedLambda class is present
+						try {
+							Class.forName("java.lang.invoke.SerializedLambda");
+						}
+						catch (Exception e) {
+							throw new TypeExtractionException("User code tries to use lambdas, but framework is running with a Java version < 8");
+						}
+						serializedLambda = serialVersion;
+						break;
+					}
+				}
+				catch (NoSuchMethodException e) {
+					// thrown if the method is not there. fall through the loop
+				}
+			}
+
+			// not a lambda method -> return null
+			if (serializedLambda == null) {
+				return null;
+			}
+
+			// find lambda method
+			Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass");
+			Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
+			Method implMethodSig = serializedLambda.getClass().getDeclaredMethod("getImplMethodSignature");
+
+			String className = (String) implClassMethod.invoke(serializedLambda);
+			String methodName = (String) implMethodNameMethod.invoke(serializedLambda);
+			String methodSig = (String) implMethodSig.invoke(serializedLambda);
+
+			Class<?> implClass = Class.forName(className.replace('/', '.'), true, Thread.currentThread().getContextClassLoader());
+
+			// find constructor
+			if (methodName.equals("<init>")) {
+				Constructor<?>[] constructors = implClass.getDeclaredConstructors();
+				for (Constructor<?> constructor : constructors) {
+					if(getConstructorDescriptor(constructor).equals(methodSig)) {
+						return new LambdaExecutable(constructor);
+					}
+				}
+			}
+			// find method
+			else {
+				List<Method> methods = getAllDeclaredMethods(implClass);
+				for (Method method : methods) {
+					if(method.getName().equals(methodName) && getMethodDescriptor(method).equals(methodSig)) {
+						return new LambdaExecutable(method);
+					}
+				}
+			}
+			throw new TypeExtractionException("No lambda method found.");
+		}
+		catch (Exception e) {
+			throw new TypeExtractionException("Could not extract lambda method out of function: " +
+				e.getClass().getSimpleName() + " - " + e.getMessage(), e);
+		}
+	}
+
+	/**
+	 * Returns all declared methods of a class including methods of superclasses.
+	 */
+	public static List<Method> getAllDeclaredMethods(Class<?> clazz) {
+		List<Method> result = new ArrayList<>();
+		while (clazz != null) {
+			Method[] methods = clazz.getDeclaredMethods();
+			Collections.addAll(result, methods);
+			clazz = clazz.getSuperclass();
+		}
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a0b09f5..c1febea 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -53,7 +53,6 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -66,6 +65,9 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
@@ -380,22 +382,27 @@ public class TypeExtractor {
 		String functionName,
 		boolean allowMissing) {
 		try {
-			final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
-			if (m != null) {
+			final LambdaExecutable exec;
+			try {
+				exec = checkAndExtractLambda(function);
+			} catch (TypeExtractionException e) {
+				throw new InvalidTypesException("Internal error occurred.", e);
+			}
+			if (exec != null) {
 				// check for lambda type erasure
-				validateLambdaGenericParameters(m);
+				validateLambdaGenericParameters(exec);
 
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-				final int paramLen = m.getGenericParameterTypes().length - 1;
+				final int paramLen = exec.getParameterTypes().length - 1;
 
-				// method references "this" implicitly
+				// executable references "this" implicitly
 				if (paramLen < 0) {
-					// methods declaring class can also be a super class of the input type
-					// we only validate if the method exists in input type
-					validateInputContainsMethod(m, inType);
+					// executable declaring class can also be a super class of the input type
+					// we only validate if the executable exists in input type
+					validateInputContainsExecutable(exec, inType);
 				}
 				else {
-					final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+					final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
 					validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType);
 				}
 
@@ -403,7 +410,7 @@ public class TypeExtractor {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
 				return new TypeExtractor().privateCreateTypeInfo(
-					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(),
+					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
 					inType,
 					null);
 			}
@@ -496,22 +503,27 @@ public class TypeExtractor {
 		String functionName,
 		boolean allowMissing) {
 		try {
-			final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
-			if (m != null) {
+			final LambdaExecutable exec;
+			try {
+				exec = checkAndExtractLambda(function);
+			} catch (TypeExtractionException e) {
+				throw new InvalidTypesException("Internal error occurred.", e);
+			}
+			if (exec != null) {
 				// check for lambda type erasure
-				validateLambdaGenericParameters(m);
+				validateLambdaGenericParameters(exec);
 				
 				// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
-				final int paramLen = m.getGenericParameterTypes().length - 1;
-				final Type input1 = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1];
-				final Type input2 = (outputTypeArgumentIndex >= 0 ) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+				final int paramLen = exec.getParameterTypes().length - 1;
+				final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1];
+				final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
 				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type);
 				validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type);
 				if(function instanceof ResultTypeQueryable) {
 					return ((ResultTypeQueryable<OUT>) function).getProducedType();
 				}
 				return new TypeExtractor().privateCreateTypeInfo(
-					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(),
+					(outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
 					in1Type,
 					in2Type);
 			}
@@ -1358,14 +1370,20 @@ public class TypeExtractor {
 		}
 	}
 
-	private static void validateInputContainsMethod(Method m, TypeInformation<?> typeInfo) {
+	private static void validateInputContainsExecutable(LambdaExecutable exec, TypeInformation<?> typeInfo) {
 		List<Method> methods = getAllDeclaredMethods(typeInfo.getTypeClass());
 		for (Method method : methods) {
-			if (method.equals(m)) {
+			if (exec.executablesEquals(method)) {
 				return;
 			}
 		}
-		throw new InvalidTypesException("Type contains no method '" + m.getName() + "'.");
+		Constructor<?>[] constructors = typeInfo.getTypeClass().getDeclaredConstructors();
+		for (Constructor<?> constructor : constructors) {
+			if (exec.executablesEquals(constructor)) {
+				return;
+			}
+		}
+		throw new InvalidTypesException("Type contains no executable '" + exec.getName() + "'.");
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1488,14 +1506,14 @@ public class TypeExtractor {
 		}
 	}
 	
-	private static void validateLambdaGenericParameters(Method m) {
+	private static void validateLambdaGenericParameters(LambdaExecutable exec) {
 		// check the arguments
-		for (Type t : m.getGenericParameterTypes()) {
+		for (Type t : exec.getParameterTypes()) {
 			validateLambdaGenericParameter(t);
 		}
 
 		// check the return type
-		validateLambdaGenericParameter(m.getGenericReturnType());
+		validateLambdaGenericParameter(exec.getReturnType());
 	}
 
 	private static void validateLambdaGenericParameter(Type t) {
@@ -1974,20 +1992,6 @@ public class TypeExtractor {
 		return false;
 	}
 
-	
-	// recursively determine all declared methods
-	private static List<Method> getAllDeclaredMethods(Class<?> clazz) {
-		List<Method> result = new ArrayList<Method>();
-		while (clazz != null) {
-			Method[] methods = clazz.getDeclaredMethods();
-			for (Method method : methods) {
-				result.add(method);
-			}
-			clazz = clazz.getSuperclass();
-		}
-		return result;
-	}
-
 	@Internal
 	public static Class<?> typeToClass(Type t) {
 		if (t instanceof Class) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64b7ae7..0d7415a 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.java.type.lambdas;
 
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
@@ -33,7 +35,6 @@ import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -74,14 +75,14 @@ public class LambdaExtractionTest {
 			MapFunction<Integer, String> instanceLambda = Object::toString;
 			MapFunction<String, Integer> constructorLambda = Integer::new;
 
-			assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
-			assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
-			assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
-			assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived));
-			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(staticLambda));
-			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(instanceLambda));
-			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(constructorLambda));
-			assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA));
+			assertNull(checkAndExtractLambda(anonymousFromInterface));
+			assertNull(checkAndExtractLambda(anonymousFromClass));
+			assertNull(checkAndExtractLambda(fromProperClass));
+			assertNull(checkAndExtractLambda(fromDerived));
+			assertNotNull(checkAndExtractLambda(staticLambda));
+			assertNotNull(checkAndExtractLambda(instanceLambda));
+			assertNotNull(checkAndExtractLambda(constructorLambda));
+			assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -272,14 +273,14 @@ public class LambdaExtractionTest {
 	public void testInstanceMethodRefSameType() {
 		MapFunction<MyType, Integer> f = MyType::getKey;
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
-		Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
 
 	@Test
 	public void testInstanceMethodRefSuperType() {
 		MapFunction<Integer, String> f = Object::toString;
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
-		Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
 	}
 
 	public static class MySubtype extends MyType {
@@ -290,14 +291,14 @@ public class LambdaExtractionTest {
 	public void testInstanceMethodRefSuperTypeProtected() {
 		MapFunction<MySubtype, Integer> f = MyType::getKey2;
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
-		Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
 
 	@Test
 	public void testConstructorMethodRef() {
 		MapFunction<String, Integer> f = Integer::new;
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
-		Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index cda1f1c..87c1fa5 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -24,6 +24,20 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class MapITCase extends JavaProgramTestBase {
 
+	public static class Trade {
+
+		public String v;
+
+		public Trade(String v) {
+			this.v = v;
+		}
+
+		@Override
+		public String toString() {
+			return v;
+		}
+	}
+
 	private static final String EXPECTED_RESULT = "22\n" +
 			"22\n" +
 			"23\n" +
@@ -41,7 +55,11 @@ public class MapITCase extends JavaProgramTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
-		DataSet<String> mappedDs = stringDs.map(Object::toString).map (s -> s.replace("1", "2"));
+		DataSet<String> mappedDs = stringDs
+			.map(Object::toString)
+			.map (s -> s.replace("1", "2"))
+			.map(Trade::new)
+			.map(Trade::toString);
 		mappedDs.writeAsText(resultPath);
 		env.execute();
 	}