You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/12 09:03:10 UTC
flink git commit: [FLINK-4793] [types] Improve lambda constructor
reference handling
Repository: flink
Updated Branches:
refs/heads/master 6731ec1e4 -> 1dda3ad00
[FLINK-4793] [types] Improve lambda constructor reference handling
This closes #2621.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dda3ad0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dda3ad0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dda3ad0
Branch: refs/heads/master
Commit: 1dda3ad009667697a620359e997e83a5ba2447dd
Parents: 6731ec1
Author: twalthr <tw...@apache.org>
Authored: Tue Oct 11 15:33:20 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Wed Oct 12 11:02:45 2016 +0200
----------------------------------------------------------------------
flink-core/pom.xml | 7 +
.../common/functions/util/FunctionUtils.java | 69 --------
.../java/typeutils/TypeExtractionException.java | 57 +++++++
.../api/java/typeutils/TypeExtractionUtils.java | 167 +++++++++++++++++++
.../flink/api/java/typeutils/TypeExtractor.java | 80 ++++-----
.../java/type/lambdas/LambdaExtractionTest.java | 27 +--
.../javaApiOperators/lambdas/MapITCase.java | 20 ++-
7 files changed, 306 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 865a253..cfa2cbb 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,6 +80,13 @@ under the License.
</exclusions>
</dependency>
+ <!-- ASM is needed for type extraction -->
+ <dependency>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm-all</artifactId>
+ <version>${asm.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index ffd885b..2bb1cb3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.common.functions.util;
-import java.lang.reflect.Method;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
@@ -62,73 +60,6 @@ public final class FunctionUtils {
return defaultContext;
}
}
-
- public static Method checkAndExtractLambdaMethod(Function function) {
- try {
- // get serialized lambda
- Object serializedLambda = null;
- for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
- try {
- Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
- replaceMethod.setAccessible(true);
- Object serialVersion = replaceMethod.invoke(function);
-
- // check if class is a lambda function
- if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
-
- // check if SerializedLambda class is present
- try {
- Class.forName("java.lang.invoke.SerializedLambda");
- }
- catch (Exception e) {
- throw new UnsupportedOperationException("User code tries to use lambdas, but framework is running with a Java version < 8");
- }
- serializedLambda = serialVersion;
- break;
- }
- }
- catch (NoSuchMethodException e) {
- // thrown if the method is not there. fall through the loop
- }
- }
-
- // not a lambda method -> return null
- if (serializedLambda == null) {
- return null;
- }
-
- // find lambda method
- Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass");
- Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
-
- String className = (String) implClassMethod.invoke(serializedLambda);
- String methodName = (String) implMethodNameMethod.invoke(serializedLambda);
-
- Class<?> implClass = Class.forName(className.replace('/', '.'), true, Thread.currentThread().getContextClassLoader());
-
- Method[] methods = implClass.getDeclaredMethods();
- Method parameterizedMethod = null;
- for (Method method : methods) {
- if(method.getName().equals(methodName)) {
- if(parameterizedMethod != null) {
- // It is very unlikely that a class contains multiple e.g. "lambda$2()" but its possible
- // Actually, the signature need to be checked, but this is very complex
- throw new Exception("Lambda method name is not unique.");
- }
- else {
- parameterizedMethod = method;
- }
- }
- }
- if (parameterizedMethod == null) {
- throw new Exception("No lambda method found.");
- }
- return parameterizedMethod;
- }
- catch (Exception e) {
- throw new RuntimeException("Could not extract lambda method out of function: " + e.getClass().getSimpleName() + " - " + e.getMessage(), e);
- }
- }
/**
* Private constructor to prevent instantiation.
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
new file mode 100644
index 0000000..0dad55d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Type extraction always contains some uncertainty due to unpredictable JVM differences
+ * between vendors or versions. This exception is thrown if an assumption failed during extraction.
+ */
+@Internal
+public class TypeExtractionException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new exception with no message.
+ */
+ public TypeExtractionException() {
+ super();
+ }
+
+ /**
+ * Creates a new exception with the given message.
+ *
+ * @param message The exception message.
+ */
+ public TypeExtractionException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a new exception with the given message and cause.
+ *
+ * @param message The exception message.
+ * @param e cause
+ */
+ public TypeExtractionException(String message, Throwable e) {
+ super(message, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
new file mode 100644
index 0000000..4439612
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Function;
+import static org.objectweb.asm.Type.getConstructorDescriptor;
+import static org.objectweb.asm.Type.getMethodDescriptor;
+
+@Internal
+public class TypeExtractionUtils {
+
+ private TypeExtractionUtils() {
+ // do not allow instantiation
+ }
+
+ /**
+ * Similar to a Java 8 Executable but with a return type.
+ */
+ public static class LambdaExecutable {
+
+ private Type[] parameterTypes;
+ private Type returnType;
+ private String name;
+ private Object executable;
+
+ public LambdaExecutable(Constructor<?> constructor) {
+ this.parameterTypes = constructor.getGenericParameterTypes();
+ this.returnType = constructor.getDeclaringClass();
+ this.name = constructor.getName();
+ this.executable = constructor;
+ }
+
+ public LambdaExecutable(Method method) {
+ this.parameterTypes = method.getGenericParameterTypes();
+ this.returnType = method.getGenericReturnType();
+ this.name = method.getName();
+ this.executable = method;
+ }
+
+ public Type[] getParameterTypes() {
+ return parameterTypes;
+ }
+
+ public Type getReturnType() {
+ return returnType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean executablesEquals(Method m) {
+ return executable.equals(m);
+ }
+
+ public boolean executablesEquals(Constructor<?> c) {
+ return executable.equals(c);
+ }
+ }
+
+ public static LambdaExecutable checkAndExtractLambda(Function function) throws TypeExtractionException {
+ try {
+ // get serialized lambda
+ Object serializedLambda = null;
+ for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
+ try {
+ Method replaceMethod = clazz.getDeclaredMethod("writeReplace");
+ replaceMethod.setAccessible(true);
+ Object serialVersion = replaceMethod.invoke(function);
+
+ // check if class is a lambda function
+ if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) {
+
+ // check if SerializedLambda class is present
+ try {
+ Class.forName("java.lang.invoke.SerializedLambda");
+ }
+ catch (Exception e) {
+ throw new TypeExtractionException("User code tries to use lambdas, but framework is running with a Java version < 8");
+ }
+ serializedLambda = serialVersion;
+ break;
+ }
+ }
+ catch (NoSuchMethodException e) {
+ // thrown if the method is not there. fall through the loop
+ }
+ }
+
+ // not a lambda method -> return null
+ if (serializedLambda == null) {
+ return null;
+ }
+
+ // find lambda method
+ Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass");
+ Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
+ Method implMethodSig = serializedLambda.getClass().getDeclaredMethod("getImplMethodSignature");
+
+ String className = (String) implClassMethod.invoke(serializedLambda);
+ String methodName = (String) implMethodNameMethod.invoke(serializedLambda);
+ String methodSig = (String) implMethodSig.invoke(serializedLambda);
+
+ Class<?> implClass = Class.forName(className.replace('/', '.'), true, Thread.currentThread().getContextClassLoader());
+
+ // find constructor
+ if (methodName.equals("<init>")) {
+ Constructor<?>[] constructors = implClass.getDeclaredConstructors();
+ for (Constructor<?> constructor : constructors) {
+ if(getConstructorDescriptor(constructor).equals(methodSig)) {
+ return new LambdaExecutable(constructor);
+ }
+ }
+ }
+ // find method
+ else {
+ List<Method> methods = getAllDeclaredMethods(implClass);
+ for (Method method : methods) {
+ if(method.getName().equals(methodName) && getMethodDescriptor(method).equals(methodSig)) {
+ return new LambdaExecutable(method);
+ }
+ }
+ }
+ throw new TypeExtractionException("No lambda method found.");
+ }
+ catch (Exception e) {
+ throw new TypeExtractionException("Could not extract lambda method out of function: " +
+ e.getClass().getSimpleName() + " - " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Returns all declared methods of a class including methods of superclasses.
+ */
+ public static List<Method> getAllDeclaredMethods(Class<?> clazz) {
+ List<Method> result = new ArrayList<>();
+ while (clazz != null) {
+ Method[] methods = clazz.getDeclaredMethods();
+ Collections.addAll(result, methods);
+ clazz = clazz.getSuperclass();
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a0b09f5..c1febea 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -53,7 +53,6 @@ import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -66,6 +65,9 @@ import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
@@ -380,22 +382,27 @@ public class TypeExtractor {
String functionName,
boolean allowMissing) {
try {
- final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
- if (m != null) {
+ final LambdaExecutable exec;
+ try {
+ exec = checkAndExtractLambda(function);
+ } catch (TypeExtractionException e) {
+ throw new InvalidTypesException("Internal error occurred.", e);
+ }
+ if (exec != null) {
// check for lambda type erasure
- validateLambdaGenericParameters(m);
+ validateLambdaGenericParameters(exec);
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
- final int paramLen = m.getGenericParameterTypes().length - 1;
+ final int paramLen = exec.getParameterTypes().length - 1;
- // method references "this" implicitly
+ // executable references "this" implicitly
if (paramLen < 0) {
- // methods declaring class can also be a super class of the input type
- // we only validate if the method exists in input type
- validateInputContainsMethod(m, inType);
+ // executable declaring class can also be a super class of the input type
+ // we only validate if the executable exists in input type
+ validateInputContainsExecutable(exec, inType);
}
else {
- final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+ final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType);
}
@@ -403,7 +410,7 @@ public class TypeExtractor {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
return new TypeExtractor().privateCreateTypeInfo(
- (outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(),
+ (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
inType,
null);
}
@@ -496,22 +503,27 @@ public class TypeExtractor {
String functionName,
boolean allowMissing) {
try {
- final Method m = FunctionUtils.checkAndExtractLambdaMethod(function);
- if (m != null) {
+ final LambdaExecutable exec;
+ try {
+ exec = checkAndExtractLambda(function);
+ } catch (TypeExtractionException e) {
+ throw new InvalidTypesException("Internal error occurred.", e);
+ }
+ if (exec != null) {
// check for lambda type erasure
- validateLambdaGenericParameters(m);
+ validateLambdaGenericParameters(exec);
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
- final int paramLen = m.getGenericParameterTypes().length - 1;
- final Type input1 = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1];
- final Type input2 = (outputTypeArgumentIndex >= 0 ) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen];
+ final int paramLen = exec.getParameterTypes().length - 1;
+ final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1];
+ final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type);
validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
return new TypeExtractor().privateCreateTypeInfo(
- (outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(),
+ (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
in1Type,
in2Type);
}
@@ -1358,14 +1370,20 @@ public class TypeExtractor {
}
}
- private static void validateInputContainsMethod(Method m, TypeInformation<?> typeInfo) {
+ private static void validateInputContainsExecutable(LambdaExecutable exec, TypeInformation<?> typeInfo) {
List<Method> methods = getAllDeclaredMethods(typeInfo.getTypeClass());
for (Method method : methods) {
- if (method.equals(m)) {
+ if (exec.executablesEquals(method)) {
return;
}
}
- throw new InvalidTypesException("Type contains no method '" + m.getName() + "'.");
+ Constructor<?>[] constructors = typeInfo.getTypeClass().getDeclaredConstructors();
+ for (Constructor<?> constructor : constructors) {
+ if (exec.executablesEquals(constructor)) {
+ return;
+ }
+ }
+ throw new InvalidTypesException("Type contains no executable '" + exec.getName() + "'.");
}
// --------------------------------------------------------------------------------------------
@@ -1488,14 +1506,14 @@ public class TypeExtractor {
}
}
- private static void validateLambdaGenericParameters(Method m) {
+ private static void validateLambdaGenericParameters(LambdaExecutable exec) {
// check the arguments
- for (Type t : m.getGenericParameterTypes()) {
+ for (Type t : exec.getParameterTypes()) {
validateLambdaGenericParameter(t);
}
// check the return type
- validateLambdaGenericParameter(m.getGenericReturnType());
+ validateLambdaGenericParameter(exec.getReturnType());
}
private static void validateLambdaGenericParameter(Type t) {
@@ -1974,20 +1992,6 @@ public class TypeExtractor {
return false;
}
-
- // recursively determine all declared methods
- private static List<Method> getAllDeclaredMethods(Class<?> clazz) {
- List<Method> result = new ArrayList<Method>();
- while (clazz != null) {
- Method[] methods = clazz.getDeclaredMethods();
- for (Method method : methods) {
- result.add(method);
- }
- clazz = clazz.getSuperclass();
- }
- return result;
- }
-
@Internal
public static Class<?> typeToClass(Type t) {
if (t instanceof Class) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64b7ae7..0d7415a 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.api.java.type.lambdas;
+import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -33,7 +35,6 @@ import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
@@ -74,14 +75,14 @@ public class LambdaExtractionTest {
MapFunction<Integer, String> instanceLambda = Object::toString;
MapFunction<String, Integer> constructorLambda = Integer::new;
- assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface));
- assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass));
- assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass));
- assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived));
- assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(staticLambda));
- assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(instanceLambda));
- assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(constructorLambda));
- assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA));
+ assertNull(checkAndExtractLambda(anonymousFromInterface));
+ assertNull(checkAndExtractLambda(anonymousFromClass));
+ assertNull(checkAndExtractLambda(fromProperClass));
+ assertNull(checkAndExtractLambda(fromDerived));
+ assertNotNull(checkAndExtractLambda(staticLambda));
+ assertNotNull(checkAndExtractLambda(instanceLambda));
+ assertNotNull(checkAndExtractLambda(constructorLambda));
+ assertNotNull(checkAndExtractLambda(STATIC_LAMBDA));
}
catch (Exception e) {
e.printStackTrace();
@@ -272,14 +273,14 @@ public class LambdaExtractionTest {
public void testInstanceMethodRefSameType() {
MapFunction<MyType, Integer> f = MyType::getKey;
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class));
- Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
@Test
public void testInstanceMethodRefSuperType() {
MapFunction<Integer, String> f = Object::toString;
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO);
- Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
public static class MySubtype extends MyType {
@@ -290,14 +291,14 @@ public class LambdaExtractionTest {
public void testInstanceMethodRefSuperTypeProtected() {
MapFunction<MySubtype, Integer> f = MyType::getKey2;
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class));
- Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
@Test
public void testConstructorMethodRef() {
MapFunction<String, Integer> f = Integer::new;
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO);
- Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
index cda1f1c..87c1fa5 100644
--- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
+++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -24,6 +24,20 @@ import org.apache.flink.test.util.JavaProgramTestBase;
public class MapITCase extends JavaProgramTestBase {
+ public static class Trade {
+
+ public String v;
+
+ public Trade(String v) {
+ this.v = v;
+ }
+
+ @Override
+ public String toString() {
+ return v;
+ }
+ }
+
private static final String EXPECTED_RESULT = "22\n" +
"22\n" +
"23\n" +
@@ -41,7 +55,11 @@ public class MapITCase extends JavaProgramTestBase {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14);
- DataSet<String> mappedDs = stringDs.map(Object::toString).map (s -> s.replace("1", "2"));
+ DataSet<String> mappedDs = stringDs
+ .map(Object::toString)
+ .map (s -> s.replace("1", "2"))
+ .map(Trade::new)
+ .map(Trade::toString);
mappedDs.writeAsText(resultPath);
env.execute();
}