You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/06/08 09:41:06 UTC
[2/2] flink git commit: [FLINK-6783] Changed passing index of type
argument while extracting return type.
[FLINK-6783] Changed passing index of type argument while extracting return type.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcaf816d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcaf816d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcaf816d
Branch: refs/heads/master
Commit: bcaf816dc5313c6c7de1e3436cc87036fd2ea3d0
Parents: 1cc1bb4
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Thu Jun 1 13:17:25 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 8 10:42:41 2017 +0200
----------------------------------------------------------------------
.../flink/api/common/functions/Partitioner.java | 6 +-
.../api/java/typeutils/TypeExtractionUtils.java | 74 +++
.../flink/api/java/typeutils/TypeExtractor.java | 642 ++++++++++++-------
.../java/type/lambdas/LambdaExtractionTest.java | 13 +
.../org/apache/flink/cep/CEPLambdaTest.java | 2 -
.../org/apache/flink/cep/PatternStream.java | 24 +-
.../cep/operator/CEPMigration11to13Test.java | 4 +-
.../flink/graph/asm/translate/Translate.java | 44 +-
.../api/datastream/AllWindowedStream.java | 57 +-
.../api/datastream/AsyncDataStream.java | 13 +-
.../api/datastream/CoGroupedStreams.java | 15 +-
.../api/datastream/ConnectedStreams.java | 48 +-
.../streaming/api/datastream/DataStream.java | 16 +-
.../streaming/api/datastream/JoinedStreams.java | 40 +-
.../streaming/api/datastream/KeyedStream.java | 16 +-
.../api/datastream/WindowedStream.java | 58 +-
.../windowing/WindowTranslationTest.java | 232 +++----
17 files changed, 846 insertions(+), 458 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
index 6c237ed..c272d3a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/Partitioner.java
@@ -22,15 +22,15 @@ import org.apache.flink.annotation.Public;
/**
* Function to implement a custom partition assignment for keys.
- *
+ *
* @param <K> The type of the key to be partitioned.
*/
@Public
-public interface Partitioner<K> extends java.io.Serializable {
+public interface Partitioner<K> extends java.io.Serializable, Function {
/**
* Computes the partition for the given key.
- *
+ *
* @param key The key.
* @param numPartitions The number of partitions to partition into.
* @return The partition index.
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index 0aac257..c6ffd55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
@@ -28,6 +29,8 @@ import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+
import static org.objectweb.asm.Type.getConstructorDescriptor;
import static org.objectweb.asm.Type.getMethodDescriptor;
@@ -161,6 +164,77 @@ public class TypeExtractionUtils {
}
/**
+ * Extracts type from given index from lambda. It supports nested types.
+ *
+ * @param exec lambda function to extract the type from
+ * @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
+ * @param paramLen count of total parameters of the lambda (including closure parameters)
+ * @param baseParametersLen count of lambda interface parameters (without closure parameters)
+ * @return extracted type
+ */
+ public static Type extractTypeFromLambda(
+ LambdaExecutable exec,
+ int[] lambdaTypeArgumentIndices,
+ int paramLen,
+ int baseParametersLen) {
+ Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
+ for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
+ output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]);
+ }
+ return output;
+ }
+
+ /**
+ * This method extracts the n-th type argument from the given type. An InvalidTypesException
+ * is thrown if the type does not have any type arguments or if the index exceeds the number
+ * of type arguments.
+ *
+ * @param t Type to extract the type arguments from
+ * @param index Index of the type argument to extract
+ * @return The extracted type argument
+ * @throws InvalidTypesException if the given type does not have any type arguments or if the
+ * index exceeds the number of type arguments.
+ */
+ public static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
+ if (t instanceof ParameterizedType) {
+ Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
+
+ if (index < 0 || index >= actualTypeArguments.length) {
+ throw new InvalidTypesException("Cannot extract the type argument with index " +
+ index + " because the type has only " + actualTypeArguments.length +
+ " type arguments.");
+ } else {
+ return actualTypeArguments[index];
+ }
+ } else {
+ throw new InvalidTypesException("The given type " + t + " is not a parameterized type.");
+ }
+ }
+
+ /**
+ * Extracts a Single Abstract Method (SAM) as defined in Java Specification (4.3.2. The Class Object,
+ * 9.8 Functional Interfaces, 9.4.3 Interface Method Body) from given class.
+ *
+ * @param baseClass
+ * @throws InvalidTypesException if the given class does not implement
+ * @return
+ */
+ public static Method getSingleAbstractMethod(Class<?> baseClass) {
+ Method sam = null;
+ for (Method method : baseClass.getMethods()) {
+ if (Modifier.isAbstract(method.getModifiers())) {
+ if (sam == null) {
+ sam = method;
+ } else {
+ throw new InvalidTypesException(
+ "Given class: " + baseClass + " is not a FunctionalInterface. It does not have a SAM.");
+ }
+ }
+ }
+ return sam;
+ }
+
+ /**
* Returns all declared methods of a class including methods of superclasses.
*/
public static List<Method> getAllDeclaredMethods(Class<?> clazz) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 112ca38..c50dfc9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -116,6 +116,8 @@ public class TypeExtractor {
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
+ public static final int[] NO_INDEX = new int[] {};
+
protected TypeExtractor() {
// only create instances for special use cases
}
@@ -161,9 +163,18 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) mapInterface, MapFunction.class, false, false, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) mapInterface,
+ MapFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ NO_INDEX,
+ inType,
+ functionName,
+ allowMissing);
}
-
+
@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
@@ -174,7 +185,16 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) flatMapInterface,
+ FlatMapFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{1, 0},
+ inType,
+ functionName,
+ allowMissing);
}
/**
@@ -194,7 +214,16 @@ public class TypeExtractor {
@Deprecated
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) foldInterface,
+ FoldFunction.class,
+ 0,
+ 1,
+ new int[]{1},
+ NO_INDEX,
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -205,7 +234,15 @@ public class TypeExtractor {
boolean allowMissing)
{
return getUnaryOperatorReturnType(
- function, AggregateFunction.class, 0, 1, inType, functionName, allowMissing);
+ function,
+ AggregateFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ NO_INDEX,
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -216,7 +253,15 @@ public class TypeExtractor {
boolean allowMissing)
{
return getUnaryOperatorReturnType(
- function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing);
+ function,
+ AggregateFunction.class,
+ 0,
+ 2,
+ NO_INDEX,
+ NO_INDEX,
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -228,7 +273,16 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) mapPartitionInterface,
+ MapPartitionFunction.class,
+ 0,
+ 1,
+ new int[]{0, 0},
+ new int[]{1, 0},
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -240,7 +294,16 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) groupReduceInterface,
+ GroupReduceFunction.class,
+ 0,
+ 1,
+ new int[]{0, 0},
+ new int[]{1, 0},
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -252,7 +315,16 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) combineInterface,
+ GroupCombineFunction.class,
+ 0,
+ 1,
+ new int[]{0, 0},
+ new int[]{1, 0},
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -266,8 +338,19 @@ public class TypeExtractor {
public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
- return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true,
- in1Type, in2Type, functionName, allowMissing);
+ return getBinaryOperatorReturnType(
+ (Function) joinInterface,
+ FlatJoinFunction.class,
+ 0,
+ 1,
+ 2,
+ new int[]{0},
+ new int[]{1},
+ new int[]{2, 0},
+ in1Type,
+ in2Type,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -281,8 +364,19 @@ public class TypeExtractor {
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
- return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false,
- in1Type, in2Type, functionName, allowMissing);
+ return getBinaryOperatorReturnType(
+ (Function) joinInterface,
+ JoinFunction.class,
+ 0,
+ 1,
+ 2,
+ new int[]{0},
+ new int[]{1},
+ NO_INDEX,
+ in1Type,
+ in2Type,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -296,8 +390,19 @@ public class TypeExtractor {
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
- return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true,
- in1Type, in2Type, functionName, allowMissing);
+ return getBinaryOperatorReturnType(
+ (Function) coGroupInterface,
+ CoGroupFunction.class,
+ 0,
+ 1,
+ 2,
+ new int[]{0, 0},
+ new int[]{1, 0},
+ new int[]{2, 0},
+ in1Type,
+ in2Type,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -311,8 +416,19 @@ public class TypeExtractor {
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
- return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false,
- in1Type, in2Type, functionName, allowMissing);
+ return getBinaryOperatorReturnType(
+ (Function) crossInterface,
+ CrossFunction.class,
+ 0,
+ 1,
+ 2,
+ new int[]{0},
+ new int[]{1},
+ NO_INDEX,
+ in1Type,
+ in2Type,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -324,7 +440,16 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
- return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing);
+ return getUnaryOperatorReturnType(
+ (Function) selectorInterface,
+ KeySelector.class,
+ 0,
+ 1,
+ new int[]{0},
+ NO_INDEX,
+ inType,
+ functionName,
+ allowMissing);
}
@PublicEvolving
@@ -333,11 +458,53 @@ public class TypeExtractor {
}
@PublicEvolving
- public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) {
- return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null);
+ public static <T> TypeInformation<T> getPartitionerTypes(
+ Partitioner<T> partitioner,
+ String functionName,
+ boolean allowMissing) {
+ try {
+ final LambdaExecutable exec;
+ try {
+ exec = checkAndExtractLambda(partitioner);
+ } catch (TypeExtractionException e) {
+ throw new InvalidTypesException("Internal error occurred.", e);
+ }
+ if (exec != null) {
+ // check for lambda type erasure
+ validateLambdaGenericParameters(exec);
+
+ // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
+ // paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
+ final int paramLen = exec.getParameterTypes().length;
+
+ final Method sam = TypeExtractionUtils.getSingleAbstractMethod(Partitioner.class);
+ // number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range
+ final int baseParametersLen = sam.getParameterTypes().length;
+
+ final Type keyType = TypeExtractionUtils.extractTypeFromLambda(
+ exec,
+ new int[]{0},
+ paramLen,
+ baseParametersLen);
+ return new TypeExtractor().privateCreateTypeInfo(keyType, null, null);
+ } else {
+ return new TypeExtractor().privateCreateTypeInfo(
+ Partitioner.class,
+ partitioner.getClass(),
+ 0,
+ null,
+ null);
+ }
+ } catch (InvalidTypesException e) {
+ if (allowMissing) {
+ return (TypeInformation<T>) new MissingTypeInfo(functionName != null ? functionName : partitioner.toString(), e);
+ } else {
+ throw e;
+ }
+ }
}
-
-
+
+
@SuppressWarnings("unchecked")
@PublicEvolving
public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
@@ -354,49 +521,21 @@ public class TypeExtractor {
/**
* Returns the unary operator's return type.
*
- * @param function Function to extract the return type from
- * @param baseClass Base class of the function
- * @param hasIterable True if the first function parameter is an iterable, otherwise false
- * @param hasCollector True if the function has an additional collector parameter, otherwise false
- * @param inType Type of the input elements (In case of an iterable, it is the element type)
- * @param functionName Function name
- * @param allowMissing Can the type information be missing
- * @param <IN> Input type
- * @param <OUT> Output type
- * @return TypeInformation of the return type of the function
- */
- @SuppressWarnings("unchecked")
- @PublicEvolving
- public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
- Function function,
- Class<?> baseClass,
- boolean hasIterable,
- boolean hasCollector,
- TypeInformation<IN> inType,
- String functionName,
- boolean allowMissing) {
-
- return getUnaryOperatorReturnType(
- function,
- baseClass,
- hasIterable ? 0 : -1,
- hasCollector ? 0 : -1,
- inType,
- functionName,
- allowMissing);
- }
-
- /**
- * Returns the unary operator's return type.
+ * <p><b>NOTE:</b> lambda type indices allow extraction of Type from lambdas. To extract input type <b>IN</b>
+ * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInputTypeArgumentIndices.
+ *
+ * <pre>
+ * <code>
+ * OUT apply(Map<String, List<IN>> value)
+ * </code>
+ * </pre>
*
* @param function Function to extract the return type from
* @param baseClass Base class of the function
- * @param inputTypeArgumentIndex Index of the type argument of function's first parameter
- * specifying the input type if it is wrapped (Iterable, Map,
- * etc.). Otherwise -1.
- * @param outputTypeArgumentIndex Index of the type argument of function's second parameter
- * specifying the output type if it is wrapped in a Collector.
- * Otherwise -1.
+ * @param inputTypeArgumentIndex Index of input type in the class specification
+ * @param outputTypeArgumentIndex Index of output type in the class specification
+ * @param lambdaInputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
+ * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
* @param inType Type of the input elements (In case of an iterable, it is the element type)
* @param functionName Function name
* @param allowMissing Can the type information be missing
@@ -411,6 +550,8 @@ public class TypeExtractor {
Class<?> baseClass,
int inputTypeArgumentIndex,
int outputTypeArgumentIndex,
+ int[] lambdaInputTypeArgumentIndices,
+ int[] lambdaOutputTypeArgumentIndices,
TypeInformation<IN> inType,
String functionName,
boolean allowMissing) {
@@ -422,37 +563,63 @@ public class TypeExtractor {
throw new InvalidTypesException("Internal error occurred.", e);
}
if (exec != null) {
+ Preconditions.checkArgument(
+ lambdaInputTypeArgumentIndices != null && lambdaInputTypeArgumentIndices.length >= 1,
+ "Indices for input type arguments within lambda not provided");
+ Preconditions.checkArgument(
+ lambdaOutputTypeArgumentIndices != null,
+ "Indices for output type arguments within lambda not provided");
// check for lambda type erasure
validateLambdaGenericParameters(exec);
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
- final int paramLen = exec.getParameterTypes().length - 1;
+ // paramLen is the total number of parameters of the provided lambda, it includes parameters added through closure
+ final int paramLen = exec.getParameterTypes().length;
+
+ final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+
+ // number of parameters the SAM of implemented interface has, the parameter indexing aplicates to this range
+ final int baseParametersLen = sam.getParameterTypes().length;
// executable references "this" implicitly
- if (paramLen < 0) {
+ if (paramLen <= 0) {
// executable declaring class can also be a super class of the input type
// we only validate if the executable exists in input type
validateInputContainsExecutable(exec, inType);
}
else {
- final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
- validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType);
+ final Type input = TypeExtractionUtils.extractTypeFromLambda(
+ exec,
+ lambdaInputTypeArgumentIndices,
+ paramLen,
+ baseParametersLen);
+ validateInputType(input, inType);
}
if (function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(
- (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
- inType,
- null);
- }
- else {
- validateInputType(baseClass, function.getClass(), 0, inType);
+
+ final Type output;
+ if (lambdaOutputTypeArgumentIndices.length > 0) {
+ output = TypeExtractionUtils.extractTypeFromLambda(
+ exec,
+ lambdaOutputTypeArgumentIndices,
+ paramLen,
+ baseParametersLen);
+ } else {
+ output = exec.getReturnType();
+ }
+
+ return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
+ } else {
+ Preconditions.checkArgument(inputTypeArgumentIndex >= 0, "Input type argument index was not provided");
+ Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+ validateInputType(baseClass, function.getClass(), inputTypeArgumentIndex, inType);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 1, inType, null);
+ return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, inType, null);
}
}
catch (InvalidTypesException e) {
@@ -467,54 +634,23 @@ public class TypeExtractor {
/**
* Returns the binary operator's return type.
*
- * @param function Function to extract the return type from
- * @param baseClass Base class of the function
- * @param hasIterables True if the first function parameter is an iterable, otherwise false
- * @param hasCollector True if the function has an additional collector parameter, otherwise false
- * @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
- * @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
- * @param functionName Function name
- * @param allowMissing Can the type information be missing
- * @param <IN1> Left side input type
- * @param <IN2> Right side input type
- * @param <OUT> Output type
- * @return TypeInformation of the return type of the function
- */
- @SuppressWarnings("unchecked")
- @PublicEvolving
- public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
- Function function,
- Class<?> baseClass,
- boolean hasIterables,
- boolean hasCollector,
- TypeInformation<IN1> in1Type,
- TypeInformation<IN2> in2Type,
- String functionName,
- boolean allowMissing) {
-
- return getBinaryOperatorReturnType(
- function,
- baseClass,
- hasIterables ? 0 : -1,
- hasCollector ? 0 : -1,
- in1Type,
- in2Type,
- functionName,
- allowMissing
- );
- }
-
- /**
- * Returns the binary operator's return type.
+ * <p><b>NOTE:</b> lambda type indices allows extraction of Type from lambdas. To extract input type <b>IN1</b>
+ * from the function given below one should pass {@code new int[] {0,1,0}} as lambdaInput1TypeArgumentIndices.
+ *
+ * <pre>
+ * <code>
+ * OUT apply(Map<String, List<IN1>> value1, List<IN2> value2)
+ * </code>
+ * </pre>
*
* @param function Function to extract the return type from
* @param baseClass Base class of the function
- * @param inputTypeArgumentIndex Index of the type argument of function's first parameter
- * specifying the input type if it is wrapped (Iterable, Map,
- * etc.). Otherwise -1.
- * @param outputTypeArgumentIndex Index of the type argument of functions second parameter
- * specifying the output type if it is wrapped in a Collector.
- * Otherwise -1.
+ * @param input1TypeArgumentIndex Index of first input type in the class specification
+ * @param input2TypeArgumentIndex Index of second input type in the class specification
+ * @param outputTypeArgumentIndex Index of output type in the class specification
+ * @param lambdaInput1TypeArgumentIndices Table of indices of the type argument specifying the first input type. See example.
+ * @param lambdaInput2TypeArgumentIndices Table of indices of the type argument specifying the second input type. See example.
+ * @param lambdaOutputTypeArgumentIndices Table of indices of the type argument specifying the input type. See example.
* @param in1Type Type of the left side input elements (In case of an iterable, it is the element type)
* @param in2Type Type of the right side input elements (In case of an iterable, it is the element type)
* @param functionName Function name
@@ -529,8 +665,12 @@ public class TypeExtractor {
public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(
Function function,
Class<?> baseClass,
- int inputTypeArgumentIndex,
+ int input1TypeArgumentIndex,
+ int input2TypeArgumentIndex,
int outputTypeArgumentIndex,
+ int[] lambdaInput1TypeArgumentIndices,
+ int[] lambdaInput2TypeArgumentIndices,
+ int[] lambdaOutputTypeArgumentIndices,
TypeInformation<IN1> in1Type,
TypeInformation<IN2> in2Type,
String functionName,
@@ -543,30 +683,67 @@ public class TypeExtractor {
throw new InvalidTypesException("Internal error occurred.", e);
}
if (exec != null) {
+ Preconditions.checkArgument(
+ lambdaInput1TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
+ "Indices for first input type arguments within lambda not provided");
+ Preconditions.checkArgument(
+ lambdaInput2TypeArgumentIndices != null && lambdaInput1TypeArgumentIndices.length >= 1,
+ "Indices for second input type arguments within lambda not provided");
+ Preconditions.checkArgument(
+ lambdaOutputTypeArgumentIndices != null,
+ "Indices for output type arguments within lambda not provided");
// check for lambda type erasure
validateLambdaGenericParameters(exec);
-
+
+ final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
+ final int baseParametersLen = sam.getParameterTypes().length;
+
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
- final int paramLen = exec.getParameterTypes().length - 1;
- final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1];
- final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen];
- validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type);
- validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type);
+ final int paramLen = exec.getParameterTypes().length;
+
+ final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
+ exec,
+ lambdaInput1TypeArgumentIndices,
+ paramLen,
+ baseParametersLen);
+ final Type input2 = TypeExtractionUtils.extractTypeFromLambda(
+ exec,
+ lambdaInput2TypeArgumentIndices,
+ paramLen,
+ baseParametersLen);
+
+ validateInputType(input1, in1Type);
+ validateInputType(input2, in2Type);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
+
+ final Type output;
+ if (lambdaOutputTypeArgumentIndices.length > 0) {
+ output = TypeExtractionUtils.extractTypeFromLambda(
+ exec,
+ lambdaOutputTypeArgumentIndices,
+ paramLen,
+ baseParametersLen);
+ } else {
+ output = exec.getReturnType();
+ }
+
return new TypeExtractor().privateCreateTypeInfo(
- (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(),
+ output,
in1Type,
in2Type);
}
else {
- validateInputType(baseClass, function.getClass(), 0, in1Type);
- validateInputType(baseClass, function.getClass(), 1, in2Type);
+ Preconditions.checkArgument(input1TypeArgumentIndex >= 0, "Input 1 type argument index was not provided");
+ Preconditions.checkArgument(input2TypeArgumentIndex >= 0, "Input 2 type argument index was not provided");
+ Preconditions.checkArgument(outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
+ validateInputType(baseClass, function.getClass(), input1TypeArgumentIndex, in1Type);
+ validateInputType(baseClass, function.getClass(), input2TypeArgumentIndex, in2Type);
if(function instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) function).getProducedType();
}
- return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), 2, in1Type, in2Type);
+ return new TypeExtractor().privateCreateTypeInfo(baseClass, function.getClass(), outputTypeArgumentIndex, in1Type, in2Type);
}
}
catch (InvalidTypesException e) {
@@ -577,7 +754,7 @@ public class TypeExtractor {
}
}
}
-
+
// --------------------------------------------------------------------------------------------
// Create type information
// --------------------------------------------------------------------------------------------
@@ -586,7 +763,7 @@ public class TypeExtractor {
public static <T> TypeInformation<T> createTypeInfo(Class<T> type) {
return (TypeInformation<T>) createTypeInfo((Type) type);
}
-
+
public static TypeInformation<?> createTypeInfo(Type t) {
TypeInformation<?> ti = new TypeExtractor().privateCreateTypeInfo(t);
if (ti == null) {
@@ -628,46 +805,46 @@ public class TypeExtractor {
}
return ti;
}
-
+
// ----------------------------------- private methods ----------------------------------------
-
+
private TypeInformation<?> privateCreateTypeInfo(Type t) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
typeHierarchy.add(t);
return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
}
-
+
// for (Rich)Functions
@SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
-
+
TypeInformation<OUT> typeInfo;
-
+
// return type is a variable -> try to get the type info from the input directly
if (returnType instanceof TypeVariable<?>) {
typeInfo = (TypeInformation<OUT>) createTypeInfoFromInputs((TypeVariable<?>) returnType, typeHierarchy, in1Type, in2Type);
-
+
if (typeInfo != null) {
return typeInfo;
}
}
-
+
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
-
+
// for LambdaFunctions
@SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
-
+
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
-
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
@@ -680,29 +857,29 @@ public class TypeExtractor {
// check if type is a subclass of tuple
else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
Type curT = t;
-
+
// do not allow usage of Tuple as type
if (typeToClass(t).equals(Tuple.class)) {
throw new InvalidTypesException(
"Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.");
}
-
+
// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
- // collect the types while moving up for a later top-down
+ // collect the types while moving up for a later top-down
while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) {
typeHierarchy.add(curT);
curT = typeToClass(curT).getGenericSuperclass();
}
-
+
if(curT == Tuple0.class) {
return new TupleTypeInfo(Tuple0.class);
}
-
+
// check if immediate child of Tuple has generics
if (curT instanceof Class<?>) {
throw new InvalidTypesException("Tuple needs to be parameterized by using generics.");
}
-
+
typeHierarchy.add(curT);
// create the type information for the subtypes
@@ -718,13 +895,13 @@ public class TypeExtractor {
}
// return tuple info
return new TupleTypeInfo(typeToClass(t), subTypesInfo);
-
+
}
// type depends on another type
// e.g. class MyMapper<E> extends MapFunction<String, E>
else if (t instanceof TypeVariable) {
Type typeVar = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) t);
-
+
if (!(typeVar instanceof TypeVariable)) {
return createTypeInfoWithTypeHierarchy(typeHierarchy, typeVar, in1Type, in2Type);
}
@@ -741,12 +918,12 @@ public class TypeExtractor {
}
}
}
- // arrays with generics
+ // arrays with generics
else if (t instanceof GenericArrayType) {
GenericArrayType genericArray = (GenericArrayType) t;
-
+
Type componentType = genericArray.getGenericComponentType();
-
+
// due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class
if (componentType instanceof Class) {
Class<?> componentClass = (Class<?>) componentType;
@@ -775,11 +952,11 @@ public class TypeExtractor {
else if (t instanceof Class) {
return privateGetForClass((Class<OUT>) t, typeHierarchy);
}
-
+
throw new InvalidTypesException("Type Information could not be created.");
}
- private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
+ private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar);
@@ -791,12 +968,12 @@ public class TypeExtractor {
else {
returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
}
-
+
// no input information exists
if (in1TypeInfo == null && in2TypeInfo == null) {
return null;
}
-
+
// create a new type hierarchy for the input
ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
// copy the function part of the type hierarchy
@@ -809,7 +986,7 @@ public class TypeExtractor {
}
}
ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1);
-
+
TypeInformation<?> info = null;
if (in1TypeInfo != null) {
// find the deepest type variable that describes the type of input 1
@@ -898,18 +1075,18 @@ public class TypeExtractor {
// the input is a tuple
else if (inTypeInfo instanceof TupleTypeInfo && isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) {
ParameterizedType tupleBaseClass;
-
+
// get tuple from possible tuple subclass
while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) {
inputTypeHierarchy.add(inType);
inType = typeToClass(inType).getGenericSuperclass();
}
inputTypeHierarchy.add(inType);
-
+
// we can assume to be parameterized since we
// already did input validation
tupleBaseClass = (ParameterizedType) inType;
-
+
Type[] tupleElements = tupleBaseClass.getActualTypeArguments();
// go thru all tuple elements and search for type variables
for (int i = 0; i < tupleElements.length; i++) {
@@ -1068,13 +1245,13 @@ public class TypeExtractor {
public static Type getParameterType(Class<?> baseClass, Class<?> clazz, int pos) {
return getParameterType(baseClass, null, clazz, pos);
}
-
+
private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) {
if (typeHierarchy != null) {
typeHierarchy.add(clazz);
}
Type[] interfaceTypes = clazz.getGenericInterfaces();
-
+
// search in interfaces for base class
for (Type t : interfaceTypes) {
Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
@@ -1082,18 +1259,18 @@ public class TypeExtractor {
return parameter;
}
}
-
+
// search in superclass for base class
Type t = clazz.getGenericSuperclass();
Type parameter = getParameterTypeFromGenericType(baseClass, typeHierarchy, t, pos);
if (parameter != null) {
return parameter;
}
-
- throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " +
+
+ throw new InvalidTypesException("The types of the interface " + baseClass.getName() + " could not be inferred. " +
"Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
}
-
+
private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {
// base class
if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) {
@@ -1109,7 +1286,7 @@ public class TypeExtractor {
typeHierarchy.add(t);
}
return getParameterType(baseClass, typeHierarchy, (Class<?>) ((ParameterizedType) t).getRawType(), pos);
- }
+ }
else if (t instanceof Class<?> && baseClass.isAssignableFrom((Class<?>) t)) {
if (typeHierarchy != null) {
typeHierarchy.add(t);
@@ -1118,11 +1295,11 @@ public class TypeExtractor {
}
return null;
}
-
+
// --------------------------------------------------------------------------------------------
// Validate input
// --------------------------------------------------------------------------------------------
-
+
private static void validateInputType(Type t, TypeInformation<?> inType) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
try {
@@ -1132,7 +1309,7 @@ public class TypeExtractor {
throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
}
}
-
+
private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
ArrayList<Type> typeHierarchy = new ArrayList<Type>();
@@ -1152,21 +1329,21 @@ public class TypeExtractor {
throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
}
}
-
+
@SuppressWarnings("unchecked")
private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) {
if (type == null) {
throw new InvalidTypesException("Unknown Error. Type is null.");
}
-
+
if (typeInfo == null) {
throw new InvalidTypesException("Unknown Error. TypeInformation is null.");
}
-
+
if (!(type instanceof TypeVariable<?>)) {
// check for Java Basic Types
if (typeInfo instanceof BasicTypeInfo) {
-
+
TypeInformation<?> actual;
// check if basic type at all
if (!(type instanceof Class<?>) || (actual = BasicTypeInfo.getInfoFor((Class<?>) type)) == null) {
@@ -1176,7 +1353,7 @@ public class TypeExtractor {
if (!typeInfo.equals(actual)) {
throw new InvalidTypesException("Basic type '" + typeInfo + "' expected but was '" + actual + "'.");
}
-
+
}
// check for Java SQL time types
else if (typeInfo instanceof SqlTimeTypeInfo) {
@@ -1198,36 +1375,36 @@ public class TypeExtractor {
if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) {
throw new InvalidTypesException("Tuple type expected.");
}
-
+
// do not allow usage of Tuple as type
if (isClassType(type) && typeToClass(type).equals(Tuple.class)) {
throw new InvalidTypesException("Concrete subclass of Tuple expected.");
}
-
+
// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) {
typeHierarchy.add(type);
type = typeToClass(type).getGenericSuperclass();
}
-
+
if(type == Tuple0.class) {
return;
}
-
+
// check if immediate child of Tuple has generics
if (type instanceof Class<?>) {
throw new InvalidTypesException("Parameterized Tuple type expected.");
}
-
+
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) typeInfo;
-
+
Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
-
+
if (subTypes.length != tti.getArity()) {
throw new InvalidTypesException("Tuple arity '" + tti.getArity() + "' expected but was '"
+ subTypes.length + "'.");
}
-
+
for (int i = 0; i < subTypes.length; i++) {
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
@@ -1258,16 +1435,16 @@ public class TypeExtractor {
&& !(type instanceof GenericArrayType && (component = ((GenericArrayType) type).getGenericComponentType()) != null)) {
throw new InvalidTypesException("Array type expected.");
}
-
+
if (component instanceof TypeVariable<?>) {
component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
if (component instanceof TypeVariable) {
return;
}
}
-
+
validateInfo(typeHierarchy, component, ((BasicArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
-
+
}
// check for object array
else if (typeInfo instanceof ObjectArrayTypeInfo<?, ?>) {
@@ -1275,7 +1452,7 @@ public class TypeExtractor {
if (!(type instanceof Class<?> && ((Class<?>) type).isArray()) && !(type instanceof GenericArrayType)) {
throw new InvalidTypesException("Object array type expected.");
}
-
+
// check component
Type component;
if (type instanceof Class<?>) {
@@ -1283,14 +1460,14 @@ public class TypeExtractor {
} else {
component = ((GenericArrayType) type).getGenericComponentType();
}
-
+
if (component instanceof TypeVariable<?>) {
component = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) component);
if (component instanceof TypeVariable) {
return;
}
}
-
+
validateInfo(typeHierarchy, component, ((ObjectArrayTypeInfo<?, ?>) typeInfo).getComponentInfo());
}
// check for value
@@ -1299,7 +1476,7 @@ public class TypeExtractor {
if (!(type instanceof Class<?> && Value.class.isAssignableFrom((Class<?>) type))) {
throw new InvalidTypesException("Value type expected.");
}
-
+
TypeInformation<?> actual;
// check value type contents
if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
@@ -1456,33 +1633,6 @@ public class TypeExtractor {
return fieldCount;
}
- /**
- * * This method extracts the n-th type argument from the given type. An InvalidTypesException
- * is thrown if the type does not have any type arguments or if the index exceeds the number
- * of type arguments.
- *
- * @param t Type to extract the type arguments from
- * @param index Index of the type argument to extract
- * @return The extracted type argument
- * @throws InvalidTypesException if the given type does not have any type arguments or if the
- * index exceeds the number of type arguments.
- */
- private static Type extractTypeArgument(Type t, int index) throws InvalidTypesException {
- if(t instanceof ParameterizedType) {
- Type[] actualTypeArguments = ((ParameterizedType) t).getActualTypeArguments();
-
- if (index < 0 || index >= actualTypeArguments.length) {
- throw new InvalidTypesException("Cannot extract the type argument with index " +
- index + " because the type has only " + actualTypeArguments.length +
- " type arguments.");
- } else {
- return actualTypeArguments[index];
- }
- } else {
- throw new InvalidTypesException("The given type " + t + " is not a parameterized type.");
- }
- }
-
private static void validateLambdaGenericParameters(LambdaExecutable exec) {
// check the arguments
for (Type t : exec.getParameterTypes()) {
@@ -1516,19 +1666,19 @@ public class TypeExtractor {
// iterate thru hierarchy from top to bottom until type variable gets a class assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
Type curT = typeHierarchy.get(i);
-
+
// parameterized type
if (curT instanceof ParameterizedType) {
Class<?> rawType = ((Class<?>) ((ParameterizedType) curT).getRawType());
-
+
for (int paramIndex = 0; paramIndex < rawType.getTypeParameters().length; paramIndex++) {
-
+
TypeVariable<?> curVarOfCurT = rawType.getTypeParameters()[paramIndex];
-
+
// check if variable names match
if (sameTypeVars(curVarOfCurT, inTypeTypeVar)) {
Type curVarType = ((ParameterizedType) curT).getActualTypeArguments()[paramIndex];
-
+
// another type variable level
if (curVarType instanceof TypeVariable<?>) {
inTypeTypeVar = (TypeVariable<?>) curVarType;
@@ -1545,14 +1695,14 @@ public class TypeExtractor {
// return the type variable of the deepest level
return inTypeTypeVar;
}
-
+
/**
* Creates type information from a given Class such as Integer, String[] or POJOs.
- *
- * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies.
+ *
+ * This method does not support ParameterizedTypes such as Tuples or complex type hierarchies.
* In most cases {@link TypeExtractor#createTypeInfo(Type)} is the recommended method for type extraction
* (a Class is a child of Type).
- *
+ *
* @param clazz a Class to create TypeInformation for
* @return TypeInformation that describes the passed Class
*/
@@ -1561,7 +1711,7 @@ public class TypeExtractor {
typeHierarchy.add(clazz);
return new TypeExtractor().privateGetForClass(clazz, typeHierarchy);
}
-
+
private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
return privateGetForClass(clazz, typeHierarchy, null, null, null);
}
@@ -1600,13 +1750,13 @@ public class TypeExtractor {
if (primitiveArrayInfo != null) {
return primitiveArrayInfo;
}
-
+
// basic type arrays: String[], Integer[], Double[]
BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz);
if (basicArrayInfo != null) {
return basicArrayInfo;
}
-
+
// object arrays
else {
TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy(
@@ -1618,7 +1768,7 @@ public class TypeExtractor {
return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
}
}
-
+
// check for writable types
if (isHadoopWritable(clazz)) {
return createHadoopWritableTypeInfo(clazz);
@@ -1635,13 +1785,13 @@ public class TypeExtractor {
if (timeTypeInfo != null) {
return timeTypeInfo;
}
-
+
// check for subclasses of Value
if (Value.class.isAssignableFrom(clazz)) {
Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass);
}
-
+
// check for subclasses of Tuple
if (Tuple.class.isAssignableFrom(clazz)) {
if(clazz == Tuple0.class) {
@@ -1680,13 +1830,13 @@ public class TypeExtractor {
// return a generic type
return new GenericTypeInfo<OUT>(clazz);
}
-
+
/**
* Checks if the given field is a valid pojo field:
* - it is public
* OR
* - there are getter and setter methods for the field.
- *
+ *
* @param f field to check
* @param clazz class of field
* @param typeHierarchy type hierarchy for materializing generic types
@@ -1753,7 +1903,7 @@ public class TypeExtractor {
LOG.info("Class " + clazz.getName() + " is not public, cannot treat it as a POJO type. Will be handled as GenericType");
return new GenericTypeInfo<OUT>(clazz);
}
-
+
// add the hierarchy of the POJO itself if it is generic
if (parameterizedType != null) {
getTypeHierarchy(typeHierarchy, parameterizedType, Object.class);
@@ -1762,7 +1912,7 @@ public class TypeExtractor {
else if (typeHierarchy.size() <= 1) {
getTypeHierarchy(typeHierarchy, clazz, Object.class);
}
-
+
List<Field> fields = getAllDeclaredFields(clazz, false);
if (fields.size() == 0) {
LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType");
@@ -1822,7 +1972,7 @@ public class TypeExtractor {
LOG.info("The default constructor of " + clazz + " should be Public to be used as a POJO.");
return null;
}
-
+
// everything is checked, we return the pojo
return pojoType;
}
@@ -1870,7 +2020,7 @@ public class TypeExtractor {
}
return null;
}
-
+
private static boolean hasFieldWithSameName(String name, List<Field> fields) {
for(Field field : fields) {
if(name.equals(field.getName())) {
@@ -1879,7 +2029,7 @@ public class TypeExtractor {
}
return false;
}
-
+
private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {
for (int j = 0; j < pojoInfo.getArity(); j++) {
PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j);
@@ -1911,20 +2061,20 @@ public class TypeExtractor {
Tuple t = (Tuple) value;
int numFields = t.getArity();
if(numFields != countFieldsInClass(value.getClass())) {
- // not a tuple since it has more fields.
+ // not a tuple since it has more fields.
return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because
// there is currently no other type that can handle such a class.
}
-
+
TypeInformation<?>[] infos = new TypeInformation[numFields];
for (int i = 0; i < numFields; i++) {
Object field = t.getField(i);
-
+
if (field == null) {
throw new InvalidTypesException("Automatic type extraction is not possible on candidates with null values. "
+ "Please specify the types directly.");
}
-
+
infos[i] = privateGetForObject(field);
}
return new TupleTypeInfo(value.getClass(), infos);
@@ -2013,10 +2163,10 @@ public class TypeExtractor {
static void validateIfWritable(TypeInformation<?> typeInfo, Type type) {
try {
// try to load the writable type info
-
+
Class<?> writableTypeInfoClass = Class
.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, typeInfo.getClass().getClassLoader());
-
+
if (writableTypeInfoClass.isAssignableFrom(typeInfo.getClass())) {
// this is actually a writable type info
// check if the type is a writable
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 64ff605..7500d73 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -261,6 +262,18 @@ public class LambdaExtractionTest {
Assert.assertTrue(ti instanceof MissingTypeInfo);
}
+ @Test
+ public void testPartitionerLambda() {
+ Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
+ final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
+
+ Assert.assertTrue(ti.isTupleType());
+ Assert.assertEquals(2, ti.getArity());
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
+ Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+
+ }
+
private static class MyType {
private int key;
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 4eff037..37bf872 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -53,7 +53,6 @@ public class CEPLambdaTest extends TestLogger {
* Tests that a Java8 lambda can be passed as a CEP select function.
*/
@Test
- @Ignore
public void testLambdaSelectFunction() {
TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
@@ -81,7 +80,6 @@ public class CEPLambdaTest extends TestLogger {
* Tests that a Java8 lambda can be passed as a CEP flat select function.
*/
@Test
- @Ignore
public void testLambdaFlatSelectFunction() {
TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 71614cf..3131a94 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -83,8 +83,10 @@ public class PatternStream<T> {
TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
patternSelectFunction,
PatternSelectFunction.class,
+ 0,
1,
- -1,
+ new int[]{0, 1, 0},
+ new int[]{},
inputStream.getType(),
null,
false);
@@ -142,8 +144,10 @@ public class PatternStream<T> {
TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternTimeoutFunction,
PatternTimeoutFunction.class,
+ 0,
1,
- -1,
+ new int[]{0, 1, 0},
+ new int[]{},
inputStream.getType(),
null,
false);
@@ -151,8 +155,10 @@ public class PatternStream<T> {
TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternSelectFunction,
PatternSelectFunction.class,
+ 0,
1,
- -1,
+ new int[]{0, 1, 0},
+ new int[]{},
inputStream.getType(),
null,
false);
@@ -184,8 +190,10 @@ public class PatternStream<T> {
TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatSelectFunction,
PatternFlatSelectFunction.class,
- 1,
0,
+ 1,
+ new int[] {0, 1, 0},
+ new int[] {1, 0},
inputStream.getType(),
null,
false);
@@ -244,8 +252,10 @@ public class PatternStream<T> {
TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatTimeoutFunction,
PatternFlatTimeoutFunction.class,
+ 0,
1,
- -1,
+ new int[]{0, 1, 0},
+ new int[]{2, 0},
inputStream.getType(),
null,
false);
@@ -253,8 +263,10 @@ public class PatternStream<T> {
TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatSelectFunction,
PatternFlatSelectFunction.class,
+ 0,
1,
- -1,
+ new int[]{0, 1, 0},
+ new int[]{1, 0},
inputStream.getType(),
null,
false);
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 69ba42f..950b5da 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -232,7 +232,7 @@ public class CEPMigration11to13Test {
NullByteKeySelector keySelector = new NullByteKeySelector();
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
- new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
@@ -284,7 +284,7 @@ public class CEPMigration11to13Test {
OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
harness.close();
- harness = new KeyedOneInputStreamOperatorTestHarness<>(
+ harness = new KeyedOneInputStreamOperatorTestHarness<Byte, Event, Map<String, List<Event>>>(
new KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
false,
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 9c4f88e..4cb4e01 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -73,7 +73,16 @@ public class Translate {
Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>) (Class<? extends Vertex>) Vertex.class;
TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(0);
- TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+ TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+ translator,
+ TranslateFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{1},
+ oldType,
+ null,
+ false);
TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1);
TupleTypeInfo<Vertex<NEW, VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
@@ -148,7 +157,16 @@ public class Translate {
Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>) (Class<? extends Edge>) Edge.class;
TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(0);
- TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+ TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+ translator,
+ TranslateFunction.class,
+ 0,
+ 1,
+ new int[] {0},
+ new int[] {1},
+ oldType,
+ null,
+ false);
TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(2);
TupleTypeInfo<Edge<NEW, EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
@@ -225,7 +243,16 @@ public class Translate {
Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>) (Class<? extends Vertex>) Vertex.class;
TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0);
TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(1);
- TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+ TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+ translator,
+ TranslateFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{1},
+ oldType,
+ null,
+ false);
TupleTypeInfo<Vertex<K, NEW>> returnType = new TupleTypeInfo<>(vertexClass, idType, newType);
@@ -300,7 +327,16 @@ public class Translate {
Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>) (Class<? extends Edge>) Edge.class;
TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0);
TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(2);
- TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
+ TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(
+ translator,
+ TranslateFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{1},
+ oldType,
+ null,
+ false);
TupleTypeInfo<Edge<K, NEW>> returnType = new TupleTypeInfo<>(edgeClass, idType, idType, newType);
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31dbb4f..ae97109 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -236,8 +236,7 @@ public class AllWindowedStream<T, W extends Window> {
AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, inType, null, false);
+ TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
return reduce(reduceFunction, function, resultType);
}
@@ -332,8 +331,7 @@ public class AllWindowedStream<T, W extends Window> {
ReduceFunction<T> reduceFunction,
ProcessAllWindowFunction<T, R, W> function) {
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false);
+ TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, input.getType());
return reduce(reduceFunction, function, resultType);
}
@@ -507,12 +505,41 @@ public class AllWindowedStream<T, W extends Window> {
TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
aggFunction, input.getType(), null, false);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- windowFunction, AllWindowFunction.class, true, true, aggResultType, null, false);
+ TypeInformation<R> resultType = getAllWindowFunctionReturnType(windowFunction, aggResultType);
return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
}
+ private static <IN, OUT> TypeInformation<OUT> getAllWindowFunctionReturnType(
+ AllWindowFunction<IN, OUT, ?> function,
+ TypeInformation<IN> inType) {
+ return TypeExtractor.getUnaryOperatorReturnType(
+ function,
+ AllWindowFunction.class,
+ 0,
+ 1,
+ new int[]{1, 0},
+ new int[]{2, 0},
+ inType,
+ null,
+ false);
+ }
+
+ private static <IN, OUT> TypeInformation<OUT> getProcessAllWindowFunctionReturnType(
+ ProcessAllWindowFunction<IN, OUT, ?> function,
+ TypeInformation<IN> inType) {
+ return TypeExtractor.getUnaryOperatorReturnType(
+ function,
+ ProcessAllWindowFunction.class,
+ 0,
+ 1,
+ new int[]{1, 0},
+ new int[]{2, 0},
+ inType,
+ null,
+ false);
+ }
+
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
@@ -642,8 +669,7 @@ public class AllWindowedStream<T, W extends Window> {
TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
aggFunction, input.getType(), null, false);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false);
+ TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(windowFunction, aggResultType);
return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
}
@@ -811,8 +837,7 @@ public class AllWindowedStream<T, W extends Window> {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
Utils.getCallLocationName(), true);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, foldAccumulatorType, null, false);
+ TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, foldAccumulatorType);
return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}
@@ -923,8 +948,7 @@ public class AllWindowedStream<T, W extends Window> {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
Utils.getCallLocationName(), true);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false);
+ TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, foldAccumulatorType);
return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}
@@ -1032,8 +1056,7 @@ public class AllWindowedStream<T, W extends Window> {
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, getInputType(), null, false);
+ TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, getInputType());
return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
@@ -1069,8 +1092,7 @@ public class AllWindowedStream<T, W extends Window> {
public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false);
+ TypeInformation<R> resultType = getProcessAllWindowFunctionReturnType(function, getInputType());
return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
}
@@ -1160,8 +1182,7 @@ public class AllWindowedStream<T, W extends Window> {
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, AllWindowFunction.class, true, true, inType, null, false);
+ TypeInformation<R> resultType = getAllWindowFunctionReturnType(function, inType);
return apply(reduceFunction, function, resultType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index 8461d2c..cb18a3f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -67,9 +67,16 @@ public class AsyncDataStream {
int bufSize,
OutputMode mode) {
- TypeInformation<OUT> outTypeInfo =
- TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, false,
- true, in.getType(), Utils.getCallLocationName(), true);
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+ func,
+ AsyncFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{1, 0},
+ in.getType(),
+ Utils.getCallLocationName(),
+ true);
// create transform
AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 8dad1cb..4bbb123 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -235,15 +235,12 @@ public class CoGroupedStreams<T1, T2> {
*/
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
- TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
- function,
- CoGroupFunction.class,
- true,
- true,
- input1.getType(),
- input2.getType(),
- "CoGroup",
- false);
+ TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
+ function,
+ input1.getType(),
+ input2.getType(),
+ "CoGroup",
+ false);
return apply(function, resultType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 0b882c8..e244bd2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -203,9 +203,19 @@ public class ConnectedStreams<IN1, IN2> {
*/
public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {
- TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper,
- CoMapFunction.class, false, true, getType1(), getType2(),
- Utils.getCallLocationName(), true);
+ TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
+ coMapper,
+ CoMapFunction.class,
+ 0,
+ 1,
+ 2,
+ TypeExtractor.NO_INDEX,
+ TypeExtractor.NO_INDEX,
+ TypeExtractor.NO_INDEX,
+ getType1(),
+ getType2(),
+ Utils.getCallLocationName(),
+ true);
return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
@@ -227,9 +237,19 @@ public class ConnectedStreams<IN1, IN2> {
public <R> SingleOutputStreamOperator<R> flatMap(
CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
- TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
- CoFlatMapFunction.class, false, true, getType1(), getType2(),
- Utils.getCallLocationName(), true);
+ TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
+ coFlatMapper,
+ CoFlatMapFunction.class,
+ 0,
+ 1,
+ 2,
+ TypeExtractor.NO_INDEX,
+ TypeExtractor.NO_INDEX,
+ TypeExtractor.NO_INDEX,
+ getType1(),
+ getType2(),
+ Utils.getCallLocationName(),
+ true);
return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
}
@@ -254,9 +274,19 @@ public class ConnectedStreams<IN1, IN2> {
public <R> SingleOutputStreamOperator<R> process(
CoProcessFunction<IN1, IN2, R> coProcessFunction) {
- TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
- CoProcessFunction.class, false, true, getType1(), getType2(),
- Utils.getCallLocationName(), true);
+ TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
+ coProcessFunction,
+ CoProcessFunction.class,
+ 0,
+ 1,
+ 2,
+ TypeExtractor.NO_INDEX,
+ TypeExtractor.NO_INDEX,
+ TypeExtractor.NO_INDEX,
+ getType1(),
+ getType2(),
+ Utils.getCallLocationName(),
+ true);
return process(coProcessFunction, outTypeInfo);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0cdc9a1..66cd8e6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -574,13 +574,15 @@ public class DataStream<T> {
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
- processFunction,
- ProcessFunction.class,
- false,
- true,
- getType(),
- Utils.getCallLocationName(),
- true);
+ processFunction,
+ ProcessFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{2, 0},
+ getType(),
+ Utils.getCallLocationName(),
+ true);
return process(processFunction, outType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index e1ffe86..f23ebcf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -221,14 +221,18 @@ public class JoinedStreams<T1, T2> {
*/
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
- function,
- JoinFunction.class,
- true,
- true,
- input1.getType(),
- input2.getType(),
- "Join",
- false);
+ function,
+ JoinFunction.class,
+ 0,
+ 1,
+ 2,
+ new int[]{0},
+ new int[]{1},
+ TypeExtractor.NO_INDEX,
+ input1.getType(),
+ input2.getType(),
+ "Join",
+ false);
return apply(function, resultType);
}
@@ -300,14 +304,18 @@ public class JoinedStreams<T1, T2> {
*/
public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
- function,
- FlatJoinFunction.class,
- true,
- true,
- input1.getType(),
- input2.getType(),
- "Join",
- false);
+ function,
+ FlatJoinFunction.class,
+ 0,
+ 1,
+ 2,
+ new int[]{0},
+ new int[]{1},
+ new int[]{2, 0},
+ input1.getType(),
+ input2.getType(),
+ "Join",
+ false);
return apply(function, resultType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 698deb8..851e614 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -267,13 +267,15 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
- processFunction,
- ProcessFunction.class,
- false,
- true,
- getType(),
- Utils.getCallLocationName(),
- true);
+ processFunction,
+ ProcessFunction.class,
+ 0,
+ 1,
+ new int[]{0},
+ new int[]{2, 0},
+ getType(),
+ Utils.getCallLocationName(),
+ true);
return process(processFunction, outType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index f8a1914..a795064 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -295,8 +295,7 @@ public class WindowedStream<T, K, W extends Window> {
LegacyWindowOperatorType legacyWindowOpType) {
TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, inType, null, false);
+ TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
return reduce(reduceFunction, function, resultType, legacyWindowOpType);
}
@@ -396,8 +395,7 @@ public class WindowedStream<T, K, W extends Window> {
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, ProcessWindowFunction.class, true, true, input.getType(), null, false);
+ TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, input.getType(), null);
return reduce(reduceFunction, function, resultType);
}
@@ -544,8 +542,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
Utils.getCallLocationName(), true);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, foldAccumulatorType, null, false);
+ TypeInformation<R> resultType = getWindowFunctionReturnType(function, foldAccumulatorType);
return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
}
@@ -663,8 +660,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<ACC> foldResultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
Utils.getCallLocationName(), true);
- TypeInformation<R> windowResultType = TypeExtractor.getUnaryOperatorReturnType(
- windowFunction, ProcessWindowFunction.class, true, true, foldResultType, Utils.getCallLocationName(), false);
+ TypeInformation<R> windowResultType = getProcessWindowFunctionReturnType(windowFunction, foldResultType, Utils.getCallLocationName());
return fold(initialValue, foldFunction, windowFunction, foldResultType, windowResultType);
}
@@ -852,8 +848,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
aggFunction, input.getType(), null, false);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- windowFunction, WindowFunction.class, true, true, aggResultType, null, false);
+ TypeInformation<R> resultType = getWindowFunctionReturnType(windowFunction, aggResultType);
return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
}
@@ -981,12 +976,42 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
aggFunction, input.getType(), null, false);
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- windowFunction, ProcessWindowFunction.class, true, true, aggResultType, null, false);
+ TypeInformation<R> resultType = getProcessWindowFunctionReturnType(windowFunction, aggResultType, null);
return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
}
+ private static <IN, OUT, KEY> TypeInformation<OUT> getWindowFunctionReturnType(
+ WindowFunction<IN, OUT, KEY, ?> function,
+ TypeInformation<IN> inType) {
+ return TypeExtractor.getUnaryOperatorReturnType(
+ function,
+ WindowFunction.class,
+ 0,
+ 1,
+ new int[]{2, 0},
+ new int[]{3, 0},
+ inType,
+ null,
+ false);
+ }
+
+ private static <IN, OUT, KEY> TypeInformation<OUT> getProcessWindowFunctionReturnType(
+ ProcessWindowFunction<IN, OUT, KEY, ?> function,
+ TypeInformation<IN> inType,
+ String functionName) {
+ return TypeExtractor.getUnaryOperatorReturnType(
+ function,
+ ProcessWindowFunction.class,
+ 0,
+ 1,
+ new int[]{2, 0},
+ new int[]{3, 0},
+ inType,
+ functionName,
+ false);
+ }
+
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
@@ -1094,8 +1119,7 @@ public class WindowedStream<T, K, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, getInputType(), null, false);
+ TypeInformation<R> resultType = getWindowFunctionReturnType(function, getInputType());
return apply(function, resultType);
}
@@ -1131,8 +1155,7 @@ public class WindowedStream<T, K, W extends Window> {
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessWindowFunction<T, R, K, W> function) {
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, ProcessWindowFunction.class, true, true, getInputType(), null, false);
+ TypeInformation<R> resultType = getProcessWindowFunctionReturnType(function, getInputType(), null);
return process(function, resultType);
}
@@ -1231,8 +1254,7 @@ public class WindowedStream<T, K, W extends Window> {
@Deprecated
public <R> SingleOutputStreamOperator<R> apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
TypeInformation<T> inType = input.getType();
- TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
- function, WindowFunction.class, true, true, inType, null, false);
+ TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType);
return apply(reduceFunction, function, resultType);
}