You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/21 12:12:44 UTC
flink git commit: [FLINK-3042] [FLINK-3060] [types] Define a way to
let types create their own TypeInformation
Repository: flink
Updated Branches:
refs/heads/master 68709b087 -> 4cc38fd36
[FLINK-3042] [FLINK-3060] [types] Define a way to let types create their own TypeInformation
This closes #2337.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cc38fd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cc38fd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cc38fd3
Branch: refs/heads/master
Commit: 4cc38fd36f3190f9c0066e9cf94580669b2410cf
Parents: 68709b0
Author: twalthr <tw...@apache.org>
Authored: Thu Aug 4 17:01:08 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Wed Sep 21 14:12:04 2016 +0200
----------------------------------------------------------------------
docs/dev/types_serialization.md | 42 ++
.../flink/api/common/typeinfo/TypeInfo.java | 44 ++
.../api/common/typeinfo/TypeInfoFactory.java | 51 ++
.../api/common/typeinfo/TypeInformation.java | 24 +-
.../flink/api/java/typeutils/TupleTypeInfo.java | 13 +-
.../flink/api/java/typeutils/TypeExtractor.java | 260 ++++++++--
.../api/java/typeutils/TypeInfoFactoryTest.java | 469 +++++++++++++++++++
.../flink/api/scala/codegen/TypeAnalyzer.scala | 25 +
.../api/scala/codegen/TypeDescriptors.scala | 8 +
.../api/scala/codegen/TypeInformationGen.scala | 22 +
.../api/scala/typeutils/CaseClassTypeInfo.scala | 6 +-
.../api/scala/typeutils/EitherTypeInfo.scala | 3 +-
.../api/scala/typeutils/EnumValueTypeInfo.scala | 2 +-
.../api/scala/typeutils/OptionTypeInfo.scala | 2 +-
.../scala/typeutils/TraversableTypeInfo.scala | 3 +-
.../flink/api/scala/typeutils/TryTypeInfo.scala | 2 +-
.../scala/typeutils/TypeInfoFactoryTest.scala | 157 +++++++
17 files changed, 1079 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 364aeb8..8a32491 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -251,3 +251,45 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
{% endhighlight %}
There are different variants of these methods available.
+
+## Defining Type Information using a Factory
+
+A type information factory allows for plugging-in user-defined type information into the Flink type system.
+You have to implement `org.apache.flink.api.common.typeinfo.TypeInfoFactory` to return your custom type information.
+The factory is called during the type extraction phase if the corresponding type has been annotated
+with the `@org.apache.flink.api.common.typeinfo.TypeInfo` annotation.
+
+Type information factories can be used in both the Java and Scala API.
+
+In a hierarchy of types the closest factory
+will be chosen while traversing upwards, however, a built-in factory has highest precedence. A factory has
+also higher precendence than Flink's built-in types, therefore you should know what you are doing.
+
+The following example shows how to annotate a custom type `MyTuple` and supply custom type information for it using a factory in Java.
+
+The annotated custom type:
+{% highlight java %}
+@TypeInfo(MyTupleTypeInfoFactory.class)
+public class MyTuple<T0, T1> {
+ public T0 myfield0;
+ public T1 myfield1;
+}
+{% endhighlight %}
+
+The factory supplying custom type information:
+{% highlight java %}
+public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
+
+ @Override
+ public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
+ return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
+ }
+}
+{% endhighlight %}
+
+The method `createTypeInfo(Type, Map<String, TypeInformation<?>>)` creates type information for the type the factory is targeted for.
+The parameters provide additional information about the type itself as well as the type's generic type parameters if available.
+
+If your type contains generic parameters that might need to be derived from the input type of a Flink function, make sure to also
+implement `org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters` for a bidirectional mapping of generic
+parameters to type information.
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
new file mode 100644
index 0000000..ce46827
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Type;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Annotation for specifying a corresponding {@link TypeInfoFactory} that can produce
+ * {@link TypeInformation} for the annotated type. In a hierarchy of types the closest annotation
+ * that defines a factory will be chosen while traversing upwards, however, a globally registered
+ * factory has highest precedence (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Public
+public @interface TypeInfo {
+
+ Class<? extends TypeInfoFactory> value();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..ea15f3a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type information factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type system. The factory is
+ * called during the type extraction phase if the corresponding type has been annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param <T> type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory<T> {
+
+ /**
+ * Creates type information for the type the factory is targeted for. The parameters provide
+ * additional information about the type itself as well as the type's generic type parameters.
+ *
+ * @param t the exact type the type information is created for; might also be a subclass of <T>
+ * @param genericParameters mapping of the type's generic type parameters to type information
+ * extracted with Flink's type extraction facilities; null values
+ * indicate that type information could not be extracted for this parameter
+ * @return type information for the type the factory is targeted for
+ */
+ public abstract TypeInformation<T> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 95eed6b..154ceb1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -18,15 +18,16 @@
package org.apache.flink.api.common.typeinfo;
+import java.util.Map;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import java.io.Serializable;
import java.util.Collections;
-import java.util.List;
/**
* TypeInformation is the core class of Flink's type system. Flink requires a type information
@@ -122,14 +123,25 @@ public abstract class TypeInformation<T> implements Serializable {
public abstract Class<T> getTypeClass();
/**
- * Returns the generic parameters of this type.
+ * Optional method for giving Flink's type extraction system information about the mapping
+ * of a generic type parameter to the type information of a subtype. This information is necessary
+ * in cases where type information should be deduced from an input type.
*
- * @return The list of generic parameters. This list can be empty.
+ * For instance, a method for a {@link Tuple2} would look like this:
+ * <code>
+ * Map m = new HashMap();
+ * m.put("T0", this.getTypeAt(0));
+ * m.put("T1", this.getTypeAt(1));
+ * return m;
+ * </code>
+ *
+ * @return map of inferred subtypes; it does not have to contain all generic parameters as key;
+ * values may be null if type could not be inferred
*/
@PublicEvolving
- public List<TypeInformation<?>> getGenericParameters() {
- // Return an empty list as the default implementation
- return Collections.emptyList();
+ public Map<String, TypeInformation<?>> getGenericParameters() {
+ // return an empty map as the default implementation
+ return Collections.emptyMap();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index d525ffb..e2cd789 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
@@ -169,7 +171,16 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
);
}
}
-
+
+ @Override
+ public Map<String, TypeInformation<?>> getGenericParameters() {
+ Map<String, TypeInformation<?>> m = new HashMap<>(types.length);
+ for (int i = 0; i < types.length; i++) {
+ m.put("T" + i, types[i]);
+ }
+ return m;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a722d72..a0b09f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -28,9 +28,12 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.ClassUtils;
@@ -56,6 +59,8 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.functions.KeySelector;
@@ -63,7 +68,8 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
-
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,6 +116,34 @@ public class TypeExtractor {
}
// --------------------------------------------------------------------------------------------
+ // TypeInfoFactory registry
+ // --------------------------------------------------------------------------------------------
+
+ private static Map<Type, Class<? extends TypeInfoFactory>> registeredTypeInfoFactories = new HashMap<>();
+
+ /**
+ * Registers a type information factory globally for a certain type. Every following type extraction
+ * operation will use the provided factory for this type. The factory will have highest precedence
+ * for this type. In a hierarchy of types the registered factory has higher precedence than annotations
+ * at the same level but lower precedence than factories defined down the hierarchy.
+ *
+ * @param t type for which a new factory is registered
+ * @param factory type information factory that will produce {@link TypeInformation}
+ */
+ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> factory) {
+ Preconditions.checkNotNull(t, "Type parameter must not be null.");
+ Preconditions.checkNotNull(factory, "Factory parameter must not be null.");
+
+ if (!TypeInfoFactory.class.isAssignableFrom(factory)) {
+ throw new IllegalArgumentException("Class is not a TypeInfoFactory.");
+ }
+ if (registeredTypeInfoFactories.containsKey(t)) {
+ throw new InvalidTypesException("A TypeInfoFactory for type '" + t + "' is already registered.");
+ }
+ registeredTypeInfoFactories.put(t, factory);
+ }
+
+ // --------------------------------------------------------------------------------------------
// Function specific methods
// --------------------------------------------------------------------------------------------
@@ -592,9 +626,14 @@ public class TypeExtractor {
@SuppressWarnings({ "unchecked", "rawtypes" })
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-
+
+ // check if type information can be created using a type factory
+ final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(t, typeHierarchy, in1Type, in2Type);
+ if (typeFromFactory != null) {
+ return typeFromFactory;
+ }
// check if type is a subclass of tuple
- if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
+ else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
Type curT = t;
// do not allow usage of Tuple as type
@@ -622,7 +661,7 @@ public class TypeExtractor {
typeHierarchy.add(curT);
// create the type information for the subtypes
- TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+ final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
@@ -655,7 +694,7 @@ public class TypeExtractor {
typeHierarchy.add(curT);
// create the type information for the subtypes
- TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+ final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
@@ -807,12 +846,40 @@ public class TypeExtractor {
return null;
}
-
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
TypeInformation<?> info = null;
-
+
+ // use a factory to find corresponding type information to type variable
+ final ArrayList<Type> factoryHierarchy = new ArrayList<>(inputTypeHierarchy);
+ final TypeInfoFactory<?> factory = getClosestFactory(factoryHierarchy, inType);
+ if (factory != null) {
+ // the type that defines the factory is last in factory hierarchy
+ final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1);
+ // defining type has generics, the factory need to be asked for a mapping of subtypes to type information
+ if (factoryDefiningType instanceof ParameterizedType) {
+ final Type[] typeParams = typeToClass(factoryDefiningType).getTypeParameters();
+ final Type[] actualParams = ((ParameterizedType) factoryDefiningType).getActualTypeArguments();
+ // go thru all elements and search for type variables
+ for (int i = 0; i < actualParams.length; i++) {
+ final Map<String, TypeInformation<?>> componentInfo = inTypeInfo.getGenericParameters();
+ final String typeParamName = typeParams[i].toString();
+ if (!componentInfo.containsKey(typeParamName) || componentInfo.get(typeParamName) == null) {
+ throw new InvalidTypesException("TypeInformation '" + inTypeInfo.getClass().getSimpleName() +
+ "' does not supply a mapping of TypeVariable '" + typeParamName + "' to corresponding TypeInformation. " +
+ "Input type inference can only produce a result with this information. " +
+ "Please implement method 'TypeInformation.getGenericParameters()' for this.");
+ }
+ info = createTypeInfoFromInput(returnTypeVar, factoryHierarchy, actualParams[i], componentInfo.get(typeParamName));
+ if (info != null) {
+ break;
+ }
+ }
+ }
+ }
// the input is a type variable
- if (inType instanceof TypeVariable) {
+ else if (inType instanceof TypeVariable) {
inType = materializeTypeVariable(inputTypeHierarchy, (TypeVariable<?>) inType);
info = findCorrespondingInfo(returnTypeVar, inType, inTypeInfo, inputTypeHierarchy);
}
@@ -873,28 +940,30 @@ public class TypeExtractor {
* @param typeHierarchy necessary for type inference
* @param in1Type necessary for type inference
* @param in2Type necessary for type inference
+ * @param lenient decides whether exceptions should be thrown if a subtype can not be determined
* @return array containing TypeInformation of sub types or null if definingType contains
* more subtypes (fields) that defined
*/
private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
- ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+ ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, boolean lenient) {
Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
// materialize possible type variables
for (int i = 0; i < subtypes.length; i++) {
+ final Type actualTypeArg = definingType.getActualTypeArguments()[i];
// materialize immediate TypeVariables
- if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
- subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]);
+ if (actualTypeArg instanceof TypeVariable<?>) {
+ subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) actualTypeArg);
}
// class or parameterized type
else {
- subtypes[i] = definingType.getActualTypeArguments()[i];
+ subtypes[i] = actualTypeArg;
}
}
TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
- ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+ final ArrayList<Type> subTypeHierarchy = new ArrayList<>(typeHierarchy);
subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
@@ -902,7 +971,7 @@ public class TypeExtractor {
subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
- if (subTypesInfo[i] == null) {
+ if (subTypesInfo[i] == null && !lenient) {
throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+ "' could not be determined. This is most likely a type erasure problem. "
@@ -910,25 +979,75 @@ public class TypeExtractor {
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
- subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
+ // create the type information of the subtype or null/exception
+ try {
+ subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
+ } catch (InvalidTypesException e) {
+ if (lenient) {
+ subTypesInfo[i] = null;
+ } else {
+ throw e;
+ }
+ }
}
}
- Class<?> originalTypeAsClass = null;
- if (isClassType(originalType)) {
- originalTypeAsClass = typeToClass(originalType);
+ // check that number of fields matches the number of subtypes
+ if (!lenient) {
+ Class<?> originalTypeAsClass = null;
+ if (isClassType(originalType)) {
+ originalTypeAsClass = typeToClass(originalType);
+ }
+ checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
+ // check if the class we assumed to conform to the defining type so far is actually a pojo because the
+ // original type contains additional fields.
+ // check for additional fields.
+ int fieldCount = countFieldsInClass(originalTypeAsClass);
+ if(fieldCount > subTypesInfo.length) {
+ return null;
+ }
}
- checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
- // check if the class we assumed to conform to the defining type so far is actually a pojo because the
- // original type contains additional fields.
- // check for additional fields.
- int fieldCount = countFieldsInClass(originalTypeAsClass);
- if(fieldCount > subTypesInfo.length) {
+
+ return subTypesInfo;
+ }
+
+ /**
+ * Creates type information using a factory if for this type or super types. Returns null otherwise.
+ */
+ @SuppressWarnings("unchecked")
+ private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoFromFactory(
+ Type t, ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+
+ final ArrayList<Type> factoryHierarchy = new ArrayList<>(typeHierarchy);
+ final TypeInfoFactory<? super OUT> factory = getClosestFactory(factoryHierarchy, t);
+ if (factory == null) {
return null;
}
- return subTypesInfo;
+ final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1);
+
+ // infer possible type parameters from input
+ final Map<String, TypeInformation<?>> genericParams;
+ if (factoryDefiningType instanceof ParameterizedType) {
+ genericParams = new HashMap<>();
+ final ParameterizedType paramDefiningType = (ParameterizedType) factoryDefiningType;
+ final Type[] args = typeToClass(paramDefiningType).getTypeParameters();
+
+ final TypeInformation<?>[] subtypeInfo = createSubTypesInfo(t, paramDefiningType, factoryHierarchy, in1Type, in2Type, true);
+ assert subtypeInfo != null;
+ for (int i = 0; i < subtypeInfo.length; i++) {
+ genericParams.put(args[i].toString(), subtypeInfo[i]);
+ }
+ } else {
+ genericParams = Collections.emptyMap();
+ }
+
+ final TypeInformation<OUT> createdTypeInfo = (TypeInformation<OUT>) factory.createTypeInfo(t, genericParams);
+ if (createdTypeInfo == null) {
+ throw new InvalidTypesException("TypeInfoFactory returned invalid TypeInformation 'null'");
+ }
+ return createdTypeInfo;
}
-
+
// --------------------------------------------------------------------------------------------
// Extract type parameters
// --------------------------------------------------------------------------------------------
@@ -1254,6 +1373,31 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
/**
+ * Returns the type information factory for a type using the factory registry or annotations.
+ */
+ @Internal
+ public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
+ final Class<?> factoryClass;
+ if (registeredTypeInfoFactories.containsKey(t)) {
+ factoryClass = registeredTypeInfoFactories.get(t);
+ }
+ else {
+ if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
+ return null;
+ }
+ final TypeInfo typeInfoAnnotation = typeToClass(t).getAnnotation(TypeInfo.class);
+ factoryClass = typeInfoAnnotation.value();
+ // check for valid factory class
+ if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
+ throw new InvalidTypesException("TypeInfo annotation does not specify a valid TypeInfoFactory.");
+ }
+ }
+
+ // instantiate
+ return (TypeInfoFactory<OUT>) InstantiationUtil.instantiate(factoryClass);
+ }
+
+ /**
* @return number of items with equal type or same raw type
*/
private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
@@ -1265,27 +1409,46 @@ public class TypeExtractor {
}
return count;
}
-
+
/**
- * @param curT : start type
- * @return Type The immediate child of the top class
+ * Traverses the type hierarchy of a type up until a certain stop class is found.
+ *
+ * @param t type for which a hierarchy need to be created
+ * @return type of the immediate child of the stop class
*/
- private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
- // skip first one
- if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
- curT = typeToClass(curT).getGenericSuperclass();
+ private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, Class<?> stopAtClass) {
+ while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) {
+ typeHierarchy.add(t);
+ t = typeToClass(t).getGenericSuperclass();
+
+ if (t == null) {
+ break;
+ }
}
- while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
- typeHierarchy.add(curT);
- curT = typeToClass(curT).getGenericSuperclass();
+ return t;
+ }
- if (curT == null) {
+ /**
+ * Traverses the type hierarchy up until a type information factory can be found.
+ *
+ * @param typeHierarchy hierarchy to be filled while traversing up
+ * @param t type for which a factory needs to be found
+ * @return closest type information factory or null if there is no factory in the type hierarchy
+ */
+ private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
+ TypeInfoFactory factory = null;
+ while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
+ typeHierarchy.add(t);
+ factory = getTypeInfoFactory(t);
+ t = typeToClass(t).getGenericSuperclass();
+
+ if (t == null) {
break;
}
}
- return curT;
+ return factory;
}
-
+
private int countFieldsInClass(Class<?> clazz) {
int fieldCount = 0;
for(Field field : clazz.getFields()) { // get all fields
@@ -1486,17 +1649,26 @@ public class TypeExtractor {
* @return TypeInformation that describes the passed Class
*/
public static <X> TypeInformation<X> getForClass(Class<X> clazz) {
- return new TypeExtractor().privateGetForClass(clazz, new ArrayList<Type>());
+ final ArrayList<Type> typeHierarchy = new ArrayList<>();
+ typeHierarchy.add(clazz);
+ return new TypeExtractor().privateGetForClass(clazz, typeHierarchy);
}
private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
return privateGetForClass(clazz, typeHierarchy, null, null, null);
}
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
+ // check if type information can be produced using a factory
+ final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
+ if (typeFromFactory != null) {
+ return typeFromFactory;
+ }
+
// Object is handled as generic type info
if (clazz.equals(Object.class)) {
return new GenericTypeInfo<>(clazz);
@@ -1859,6 +2031,14 @@ public class TypeExtractor {
private <X> TypeInformation<X> privateGetForObject(X value) {
checkNotNull(value);
+ // check if type information can be produced using a factory
+ final ArrayList<Type> typeHierarchy = new ArrayList<>();
+ typeHierarchy.add(value.getClass());
+ final TypeInformation<X> typeFromFactory = createTypeInfoFromFactory(value.getClass(), typeHierarchy, null, null);
+ if (typeFromFactory != null) {
+ return typeFromFactory;
+ }
+
// check if we can extract the types from tuples, otherwise work with the class
if (value instanceof Tuple) {
Tuple t = (Tuple) value;
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
new file mode 100644
index 0000000..f055879
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for extracting {@link org.apache.flink.api.common.typeinfo.TypeInformation} from types
+ * using a {@link org.apache.flink.api.common.typeinfo.TypeInfoFactory}
+ */
+public class TypeInfoFactoryTest {
+
+ @Test
+ public void testSimpleType() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(IntLike.class);
+ assertEquals(INT_TYPE_INFO, ti);
+
+ ti = TypeExtractor.getForClass(IntLike.class);
+ assertEquals(INT_TYPE_INFO, ti);
+
+ ti = TypeExtractor.getForObject(new IntLike());
+ assertEquals(INT_TYPE_INFO, ti);
+ }
+
+ @Test
+ public void testMyEitherGenericType() {
+ MapFunction<Boolean, MyEither<Boolean, String>> f = new MyEitherMapper<>();
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BOOLEAN_TYPE_INFO);
+ assertTrue(ti instanceof EitherTypeInfo);
+ EitherTypeInfo eti = (EitherTypeInfo) ti;
+ assertEquals(BOOLEAN_TYPE_INFO, eti.getLeftType());
+ assertEquals(STRING_TYPE_INFO, eti.getRightType());
+ }
+
+ @Test
+ public void testMyOptionGenericType() {
+ TypeInformation<MyOption<Tuple2<Boolean, String>>> inTypeInfo = new MyOptionTypeInfo<>(
+ new TupleTypeInfo<Tuple2<Boolean, String>>(BOOLEAN_TYPE_INFO, STRING_TYPE_INFO));
+ MapFunction<MyOption<Tuple2<Boolean, String>>, MyOption<Tuple2<Boolean, Boolean>>> f = new MyOptionMapper<>();
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, inTypeInfo);
+ assertTrue(ti instanceof MyOptionTypeInfo);
+ MyOptionTypeInfo oti = (MyOptionTypeInfo) ti;
+ assertTrue(oti.getInnerType() instanceof TupleTypeInfo);
+ TupleTypeInfo tti = (TupleTypeInfo) oti.getInnerType();
+ assertEquals(BOOLEAN_TYPE_INFO, tti.getTypeAt(0));
+ assertEquals(BOOLEAN_TYPE_INFO, tti.getTypeAt(1));
+ }
+
+ @Test
+ public void testMyTuple() {
+ TypeInformation<Tuple1<MyTuple<Double, String>>> inTypeInfo = new TupleTypeInfo<>(
+ new MyTupleTypeInfo(DOUBLE_TYPE_INFO, STRING_TYPE_INFO));
+ MapFunction<Tuple1<MyTuple<Double, String>>, Tuple1<MyTuple<Boolean, Double>>> f = new MyTupleMapperL2<>();
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, inTypeInfo);
+ assertTrue(ti instanceof TupleTypeInfo);
+ TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+ assertTrue(tti.getTypeAt(0) instanceof MyTupleTypeInfo);
+ MyTupleTypeInfo mtti = (MyTupleTypeInfo) tti.getTypeAt(0);
+ assertEquals(BOOLEAN_TYPE_INFO, mtti.getField0());
+ assertEquals(DOUBLE_TYPE_INFO, mtti.getField1());
+ }
+
+ @Test
+ public void testMyTupleHierarchy() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(MyTuple2.class);
+ assertTrue(ti instanceof MyTupleTypeInfo);
+ MyTupleTypeInfo<?, ?> mtti = (MyTupleTypeInfo) ti;
+ assertEquals(STRING_TYPE_INFO, mtti.getField0());
+ assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
+ }
+
+ @Test
+ public void testMyTupleHierarchyWithInference() {
+ TypeInformation<Tuple1<MyTuple3<Tuple1<Float>>>> inTypeInfo = new TupleTypeInfo<>(new MyTupleTypeInfo<>(
+ new TupleTypeInfo<Tuple1<Float>>(FLOAT_TYPE_INFO), BOOLEAN_TYPE_INFO));
+ MapFunction<Tuple1<MyTuple3<Tuple1<Float>>>, Tuple1<MyTuple3<Tuple2<Float, String>>>> f = new MyTuple3Mapper<>();
+ TypeInformation ti = TypeExtractor.getMapReturnTypes(f, inTypeInfo);
+ assertTrue(ti instanceof TupleTypeInfo);
+ TupleTypeInfo<?> tti = (TupleTypeInfo) ti;
+ assertTrue(tti.getTypeAt(0) instanceof MyTupleTypeInfo);
+ MyTupleTypeInfo mtti = (MyTupleTypeInfo) tti.getTypeAt(0);
+ assertEquals(new TupleTypeInfo<>(FLOAT_TYPE_INFO, STRING_TYPE_INFO), mtti.getField0());
+ assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
+ }
+
+ @Test(expected = InvalidTypesException.class)
+ public void testMissingTypeInfo() {
+ MapFunction f = new MyFaultyMapper();
+ TypeExtractor.getMapReturnTypes(f, INT_TYPE_INFO);
+ }
+
+ @Test(expected = InvalidTypesException.class)
+ public void testMissingTypeInference() {
+ MapFunction f = new MyFaultyMapper2();
+ TypeExtractor.getMapReturnTypes(f, new MyFaultyTypeInfo());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ public static class MyTuple3Mapper<Y> implements MapFunction<Tuple1<MyTuple3<Tuple1<Y>>>, Tuple1<MyTuple3<Tuple2<Y, String>>>> {
+ @Override
+ public Tuple1<MyTuple3<Tuple2<Y, String>>> map(Tuple1<MyTuple3<Tuple1<Y>>> value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class MyTuple3<T> extends MyTuple<T, Boolean> {
+ // empty
+ }
+
+ public static class MyTuple2 extends MyTuple<String, Boolean> {
+ // empty
+ }
+
+ public static class MyFaultyMapper2<T> implements MapFunction<MyFaulty<T>, MyFaulty<T>> {
+ @Override
+ public MyFaulty<T> map(MyFaulty<T> value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class MyFaultyMapper<T> implements MapFunction<T, MyFaulty<T>> {
+ @Override
+ public MyFaulty<T> map(T value) throws Exception {
+ return null;
+ }
+ }
+
+ @TypeInfo(FaultyTypeInfoFactory.class)
+ public static class MyFaulty<Y> {
+ // empty
+ }
+
+ public static class FaultyTypeInfoFactory extends TypeInfoFactory {
+ @Override
+ public TypeInformation createTypeInfo(Type t, Map genericParameters) {
+ return null;
+ }
+ }
+
+ public static class MyFaultyTypeInfo extends TypeInformation<MyFaulty> {
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 0;
+ }
+
+ @Override
+ public Class<MyFaulty> getTypeClass() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<MyFaulty> createSerializer(ExecutionConfig config) {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+ }
+
+ public static class MyTupleMapperL1<A, B> implements MapFunction<Tuple1<MyTuple<A, String>>, Tuple1<MyTuple<B, A>>> {
+ @Override
+ public Tuple1<MyTuple<B, A>> map(Tuple1<MyTuple<A, String>> value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class MyTupleMapperL2<C> extends MyTupleMapperL1<C, Boolean> {
+ // empty
+ }
+
+ @TypeInfo(MyTupleTypeInfoFactory.class)
+ public static class MyTuple<T0, T1> {
+ // empty
+ }
+
+ public static class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
+ return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
+ }
+ }
+
+ public static class MyTupleTypeInfo<T0, T1> extends TypeInformation<MyTuple<T0, T1>> {
+ private TypeInformation field0;
+ private TypeInformation field1;
+
+ public TypeInformation getField0() {
+ return field0;
+ }
+
+ public TypeInformation getField1() {
+ return field1;
+ }
+
+ public MyTupleTypeInfo(TypeInformation field0, TypeInformation field1) {
+ this.field0 = field0;
+ this.field1 = field1;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 0;
+ }
+
+ @Override
+ public Class<MyTuple<T0, T1>> getTypeClass() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<MyTuple<T0, T1>> createSerializer(ExecutionConfig config) {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+
+ @Override
+ public Map<String, TypeInformation<?>> getGenericParameters() {
+ Map<String, TypeInformation<?>> map = new HashMap<>(2);
+ map.put("T0", field0);
+ map.put("T1", field1);
+ return map;
+ }
+ }
+
+ public static class MyOptionMapper<T> implements MapFunction<MyOption<Tuple2<T, String>>, MyOption<Tuple2<T, T>>> {
+ @Override
+ public MyOption<Tuple2<T, T>> map(MyOption<Tuple2<T, String>> value) throws Exception {
+ return null;
+ }
+ }
+
+ @TypeInfo(MyOptionTypeInfoFactory.class)
+ public static class MyOption<T> {
+ // empty
+ }
+
+ public static class MyOptionTypeInfoFactory<T> extends TypeInfoFactory<MyOption<T>> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public TypeInformation<MyOption<T>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParams) {
+ return new MyOptionTypeInfo(genericParams.get("T"));
+ }
+ }
+
+ public static class MyOptionTypeInfo<T> extends TypeInformation<MyOption<T>> {
+
+ private final TypeInformation<T> innerType;
+
+ public MyOptionTypeInfo(TypeInformation<T> innerType) {
+ this.innerType = innerType;
+ }
+
+ public TypeInformation<T> getInnerType() {
+ return innerType;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 0;
+ }
+
+ @Override
+ public Class<MyOption<T>> getTypeClass() {
+ return null;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<MyOption<T>> createSerializer(ExecutionConfig config) {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+
+ @Override
+ public Map<String, TypeInformation<?>> getGenericParameters() {
+ Map<String, TypeInformation<?>> map = new HashMap<>(1);
+ map.put("T", innerType);
+ return map;
+ }
+ }
+
+ public static class MyEitherMapper<T> implements MapFunction<T, MyEither<T, String>> {
+ @Override
+ public MyEither<T, String> map(T value) throws Exception {
+ return null;
+ }
+ }
+
+ @TypeInfo(MyEitherTypeInfoFactory.class)
+ public static class MyEither<A, B> {
+ // empty
+ }
+
+ public static class MyEitherTypeInfoFactory<A, B> extends TypeInfoFactory<MyEither<A, B>> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public TypeInformation<MyEither<A,B>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParams) {
+ return new EitherTypeInfo(genericParams.get("A"), genericParams.get("B"));
+ }
+ }
+
+ @TypeInfo(IntLikeTypeInfoFactory.class)
+ public static class IntLike {
+ // empty
+ }
+
+ public static class IntLikeTypeInfoFactory extends TypeInfoFactory<IntLike> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public TypeInformation<IntLike> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParams) {
+ return (TypeInformation) INT_TYPE_INFO;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index a8587ef..11d5ec7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -51,6 +51,9 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
case TypeParameter() => TypeParameterDescriptor(id, tpe)
+ // type or super type defines type information factory
+ case FactoryType(baseType) => analyzeFactoryType(id, tpe, baseType)
+
case PrimitiveType(default, wrapper) => PrimitiveDescriptor(id, tpe, default, wrapper)
case BoxedPrimitiveType(default, wrapper, box, unbox) =>
@@ -91,6 +94,19 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
}
}
+ private def analyzeFactoryType(
+ id: Int,
+ tpe: Type,
+ baseType: Type): UDTDescriptor = {
+ val params: Seq[UDTDescriptor] = baseType match {
+ case TypeRef(_, _, args) =>
+ args.map(analyze)
+ case _ =>
+ Seq[UDTDescriptor]()
+ }
+ FactoryTypeDescriptor(id, tpe, baseType, params)
+ }
+
private def analyzeArray(
id: Int,
tpe: Type,
@@ -438,6 +454,15 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
def unapply(tpe: Type): Boolean = tpe <:< typeOf[org.apache.flink.api.java.tuple.Tuple]
}
+ private object FactoryType {
+ def unapply(tpe: Type): Option[Type] = {
+ val definingType = tpe.typeSymbol.asClass.baseClasses find {
+ _.annotations.exists(_.tpe =:= typeOf[org.apache.flink.api.common.typeinfo.TypeInfo])
+ }
+ definingType.map(tpe.baseType)
+ }
+ }
+
private class UDTAnalyzerCache {
private val caches = new DynamicVariable[Map[Type, RecursiveDescriptor]](Map())
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
index 4efa546..9efde0f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
@@ -19,6 +19,7 @@ package org.apache.flink.api.scala.codegen
import org.apache.flink.annotation.Internal
+import scala.collection.Map
import scala.language.postfixOps
import scala.reflect.macros.Context
@@ -53,6 +54,13 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C]
case class TryDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends UDTDescriptor
+ case class FactoryTypeDescriptor(
+ id: Int,
+ tpe: Type,
+ baseType: Type,
+ params: Seq[UDTDescriptor])
+ extends UDTDescriptor
+
case class OptionDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends UDTDescriptor
case class BoxedPrimitiveDescriptor(
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index ee0d167..9736e81 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -52,6 +52,9 @@ private[flink] trait TypeInformationGen[C <: Context] {
// We have this for internal use so that we can use it to recursively generate a tree of
// TypeInformation from a tree of UDTDescriptor
def mkTypeInfo[T: c.WeakTypeTag](desc: UDTDescriptor): c.Expr[TypeInformation[T]] = desc match {
+
+ case f: FactoryTypeDescriptor => mkTypeInfoFromFactory(f)
+
case cc@CaseClassDescriptor(_, tpe, _, _, _) =>
mkCaseClassTypeInfo(cc)(c.WeakTypeTag(tpe).asInstanceOf[c.WeakTypeTag[Product]])
.asInstanceOf[c.Expr[TypeInformation[T]]]
@@ -93,6 +96,25 @@ private[flink] trait TypeInformationGen[C <: Context] {
case d => mkGenericTypeInfo(d)
}
+ def mkTypeInfoFromFactory[T: c.WeakTypeTag](desc: FactoryTypeDescriptor)
+ : c.Expr[TypeInformation[T]] = {
+
+ val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
+ val baseClazz = c.Expr[Class[T]](Literal(Constant(desc.baseType)))
+
+ val typeInfos = desc.params map { p => mkTypeInfo(p)(c.WeakTypeTag(p.tpe)).tree }
+ val typeInfosList = c.Expr[List[TypeInformation[_]]](mkList(typeInfos.toList))
+
+ reify {
+ val factory = TypeExtractor.getTypeInfoFactory[T](baseClazz.splice)
+ val genericParameters = typeInfosList.splice
+ .zip(baseClazz.splice.getTypeParameters).map { case (typeInfo, typeParam) =>
+ typeParam.getName -> typeInfo
+ }.toMap[String, TypeInformation[_]]
+ factory.createTypeInfo(tpeClazz.splice, genericParameters.asJava)
+ }
+ }
+
def mkCaseClassTypeInfo[T <: Product : c.WeakTypeTag](
desc: CaseClassDescriptor): c.Expr[TypeInformation[T]] = {
val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index d658fde..2aecd7a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -47,8 +47,10 @@ abstract class CaseClassTypeInfo[T <: Product](
extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
@PublicEvolving
- override def getGenericParameters: java.util.List[TypeInformation[_]] = {
- typeParamTypeInfos.toList.asJava
+ override def getGenericParameters: java.util.Map[String, TypeInformation[_]] = {
+ typeParamTypeInfos.zipWithIndex.map { case (info, index) =>
+ "T" + (index + 1) -> info
+ }.toMap[String, TypeInformation[_]].asJava
}
private val REGEX_INT_FIELD: String = "[0-9]+"
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 406f073..e897309 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -47,7 +47,8 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
@PublicEvolving
override def getTypeClass = clazz
@PublicEvolving
- override def getGenericParameters = List[TypeInformation[_]](leftTypeInfo, rightTypeInfo).asJava
+ override def getGenericParameters =
+ Map[String, TypeInformation[_]]("A" -> leftTypeInfo, "B" -> rightTypeInfo).asJava
@PublicEvolving
def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index 92d2704..efc6427 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -46,7 +46,7 @@ class EnumValueTypeInfo[E <: Enumeration](val enum: E, val clazz: Class[E#Value]
@PublicEvolving
override def getTypeClass = clazz
@PublicEvolving
- override def getGenericParameters = List.empty[TypeInformation[_]].asJava
+ override def getGenericParameters = Map.empty[String, TypeInformation[_]].asJava
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 58ae77c..73fe580 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -44,7 +44,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio
@PublicEvolving
override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
@PublicEvolving
- override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
+ override def getGenericParameters = Map[String, TypeInformation[_]]("A" -> elemTypeInfo).asJava
@PublicEvolving
override def createComparator(ascending: Boolean, executionConfig: ExecutionConfig) = {
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 82fd8ae..47fb039 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -46,7 +46,8 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
@PublicEvolving
override def getTypeClass: Class[T] = clazz
@PublicEvolving
- override def getGenericParameters = List[TypeInformation[_]](elementTypeInfo).asJava
+ override def getGenericParameters =
+ Map[String, TypeInformation[_]]("A" -> elementTypeInfo).asJava
@PublicEvolving
def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 0a5a06d..b09c353 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -46,7 +46,7 @@ class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: TypeInformation[A])
@PublicEvolving
override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
@PublicEvolving
- override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
+ override def getGenericParameters = Map[String, TypeInformation[_]]("T" -> elemTypeInfo).asJava
@PublicEvolving
def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
new file mode 100644
index 0000000..5873630
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import java.lang.reflect.Type
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInfoFactory, TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TypeInfoFactoryTest._
+import org.apache.flink.api.java.typeutils.{EitherTypeInfo => JavaEitherTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.TypeInfoFactoryTest._
+import org.apache.flink.util.TestLogger
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class TypeInfoFactoryTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testSimpleType(): Unit = {
+ val ti = createTypeInformation[ScalaIntLike]
+ assertEquals(INT_TYPE_INFO, ti)
+ }
+
+ @Test
+ def testMyTuple(): Unit = {
+ val ti = createTypeInformation[MyTuple[Double, String]]
+ assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+ val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+ assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
+ assertEquals(STRING_TYPE_INFO, mtti.getField1)
+ }
+
+ @Test
+ def testMyTupleHierarchy() {
+ val ti = createTypeInformation[MyTuple2]
+ assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+ val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+ assertEquals(STRING_TYPE_INFO, mtti.getField0)
+ assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+
+ val ti2 = createTypeInformation[MyScalaTupleClass]
+ assertTrue(ti2.isInstanceOf[MyTupleTypeInfo[_, _]])
+ val mtti2 = ti2.asInstanceOf[MyTupleTypeInfo[_, _]]
+ assertEquals(STRING_TYPE_INFO, mtti.getField0)
+ assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+ }
+
+ @Test
+ def testMyTupleHierarchyWithCaseClass(): Unit = {
+ val ti = createTypeInformation[MyScalaTupleCaseClass]
+ assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+ val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+ assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
+ assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+ }
+
+ @Test
+ def testMyEitherGenericType(): Unit = {
+ val ti = createTypeInformation[MyScalaEither[String, (Double, Int)]]
+ assertTrue(ti.isInstanceOf[JavaEitherTypeInfo[_, _]])
+ val eti = ti.asInstanceOf[JavaEitherTypeInfo[_, _]]
+ assertEquals(STRING_TYPE_INFO, eti.getLeftType)
+ assertTrue(eti.getRightType.isInstanceOf[CaseClassTypeInfo[_]])
+ val cti = eti.getRightType.asInstanceOf[CaseClassTypeInfo[_]]
+ assertEquals(DOUBLE_TYPE_INFO, cti.getTypeAt(0))
+ assertEquals(INT_TYPE_INFO, cti.getTypeAt(1))
+ }
+
+ @Test
+ def testScalaFactory(): Unit = {
+ val ti = createTypeInformation[MyScalaOption[Double]]
+ assertTrue(ti.isInstanceOf[MyScalaOptionTypeInfo])
+ val moti = ti.asInstanceOf[MyScalaOptionTypeInfo]
+ assertEquals(DOUBLE_TYPE_INFO, moti.elementType)
+ }
+}
+
+// --------------------------------------------------------------------------------------------
+// Utilities
+// --------------------------------------------------------------------------------------------
+
+object TypeInfoFactoryTest {
+
+ @TypeInfo(classOf[IntLikeTypeInfoFactory])
+ case class ScalaIntLike(myint: Int)
+
+ class MyScalaTupleClass extends MyTuple2
+
+ case class MyScalaTupleCaseClass(additional: Boolean) extends MyTuple3[Double]
+
+ @TypeInfo(classOf[MyEitherTypeInfoFactory[_, _]])
+ class MyScalaEither[A, B] {
+ // do nothing here
+ }
+
+ @TypeInfo(classOf[MyScalaOptionTypeInfoFactory])
+ class MyScalaOption[Z] {
+ // do nothing here
+ }
+
+ class MyScalaOptionTypeInfoFactory extends TypeInfoFactory[MyOption[_]] {
+
+ override def createTypeInfo(
+ t: Type,
+ genericParameters: util.Map[String, TypeInformation[_]])
+ : TypeInformation[MyOption[_]] = {
+ new MyScalaOptionTypeInfo(genericParameters.get("Z"))
+ }
+ }
+
+ class MyScalaOptionTypeInfo(val elementType: TypeInformation[_])
+ extends TypeInformation[MyOption[_]] {
+
+ override def isBasicType: Boolean = ???
+
+ override def isTupleType: Boolean = ???
+
+ override def getArity: Int = ???
+
+ override def getTotalFields: Int = ???
+
+ override def getTypeClass: Class[MyOption[_]] = ???
+
+ override def isKeyType: Boolean = ???
+
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[MyOption[_]] = ???
+
+ override def canEqual(obj: scala.Any): Boolean = ???
+
+ override def hashCode(): Int = ???
+
+ override def toString: String = ???
+
+ override def equals(obj: scala.Any): Boolean = ???
+ }
+}