You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/21 12:12:44 UTC

flink git commit: [FLINK-3042] [FLINK-3060] [types] Define a way to let types create their own TypeInformation

Repository: flink
Updated Branches:
  refs/heads/master 68709b087 -> 4cc38fd36


[FLINK-3042] [FLINK-3060] [types] Define a way to let types create their own TypeInformation

This closes #2337.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cc38fd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cc38fd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cc38fd3

Branch: refs/heads/master
Commit: 4cc38fd36f3190f9c0066e9cf94580669b2410cf
Parents: 68709b0
Author: twalthr <tw...@apache.org>
Authored: Thu Aug 4 17:01:08 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Wed Sep 21 14:12:04 2016 +0200

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |  42 ++
 .../flink/api/common/typeinfo/TypeInfo.java     |  44 ++
 .../api/common/typeinfo/TypeInfoFactory.java    |  51 ++
 .../api/common/typeinfo/TypeInformation.java    |  24 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |  13 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 260 ++++++++--
 .../api/java/typeutils/TypeInfoFactoryTest.java | 469 +++++++++++++++++++
 .../flink/api/scala/codegen/TypeAnalyzer.scala  |  25 +
 .../api/scala/codegen/TypeDescriptors.scala     |   8 +
 .../api/scala/codegen/TypeInformationGen.scala  |  22 +
 .../api/scala/typeutils/CaseClassTypeInfo.scala |   6 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |   3 +-
 .../api/scala/typeutils/EnumValueTypeInfo.scala |   2 +-
 .../api/scala/typeutils/OptionTypeInfo.scala    |   2 +-
 .../scala/typeutils/TraversableTypeInfo.scala   |   3 +-
 .../flink/api/scala/typeutils/TryTypeInfo.scala |   2 +-
 .../scala/typeutils/TypeInfoFactoryTest.scala   | 157 +++++++
 17 files changed, 1079 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 364aeb8..8a32491 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -251,3 +251,45 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
 {% endhighlight %}
 
 There are different variants of these methods available.
+
+## Defining Type Information using a Factory
+
+A type information factory allows for plugging-in user-defined type information into the Flink type system.
+You have to implement `org.apache.flink.api.common.typeinfo.TypeInfoFactory` to return your custom type information. 
+The factory is called during the type extraction phase if the corresponding type has been annotated 
+with the `@org.apache.flink.api.common.typeinfo.TypeInfo` annotation. 
+
+Type information factories can be used in both the Java and Scala API.
+
+In a hierarchy of types the closest factory 
+will be chosen while traversing upwards, however, a built-in factory has highest precedence. A factory has 
+also higher precendence than Flink's built-in types, therefore you should know what you are doing.
+
+The following example shows how to annotate a custom type `MyTuple` and supply custom type information for it using a factory in Java.
+
+The annotated custom type:
+{% highlight java %}
+@TypeInfo(MyTupleTypeInfoFactory.class)
+public class MyTuple<T0, T1> {
+  public T0 myfield0;
+  public T1 myfield1;
+}
+{% endhighlight %}
+
+The factory supplying custom type information:
+{% highlight java %}
+public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
+
+  @Override
+  public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
+    return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
+  }
+}
+{% endhighlight %}
+
+The method `createTypeInfo(Type, Map<String, TypeInformation<?>>)` creates type information for the type the factory is targeted for. 
+The parameters provide additional information about the type itself as well as the type's generic type parameters if available.
+
+If your type contains generic parameters that might need to be derived from the input type of a Flink function, make sure to also 
+implement `org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters` for a bidirectional mapping of generic 
+parameters to type information.

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
new file mode 100644
index 0000000..ce46827
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Type;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Annotation for specifying a corresponding {@link TypeInfoFactory} that can produce
+ * {@link TypeInformation} for the annotated type. In a hierarchy of types the closest annotation
+ * that defines a factory will be chosen while traversing upwards, however, a globally registered
+ * factory has highest precedence (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Public
+public @interface TypeInfo {
+
+	Class<? extends TypeInfoFactory> value();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..ea15f3a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Base class for implementing a type information factory. A type information factory allows for
+ * plugging-in user-defined {@link TypeInformation} into the Flink type system. The factory is
+ * called during the type extraction phase if the corresponding type has been annotated with
+ * {@link TypeInfo}. In a hierarchy of types the closest factory will be chosen while traversing
+ * upwards, however, a globally registered factory has highest precedence
+ * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ *
+ * @param <T> type for which {@link TypeInformation} is created
+ */
+@Public
+public abstract class TypeInfoFactory<T> {
+
+	/**
+	 * Creates type information for the type the factory is targeted for. The parameters provide
+	 * additional information about the type itself as well as the type's generic type parameters.
+	 *
+	 * @param t the exact type the type information is created for; might also be a subclass of &lt;T&gt;
+	 * @param genericParameters mapping of the type's generic type parameters to type information
+	 *                          extracted with Flink's type extraction facilities; null values
+	 *                          indicate that type information could not be extracted for this parameter
+	 * @return type information for the type the factory is targeted for
+	 */
+	public abstract TypeInformation<T> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 95eed6b..154ceb1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import java.util.Map;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * TypeInformation is the core class of Flink's type system. Flink requires a type information
@@ -122,14 +123,25 @@ public abstract class TypeInformation<T> implements Serializable {
 	public abstract Class<T> getTypeClass();
 
 	/**
-	 * Returns the generic parameters of this type.
+	 * Optional method for giving Flink's type extraction system information about the mapping
+	 * of a generic type parameter to the type information of a subtype. This information is necessary
+	 * in cases where type information should be deduced from an input type.
 	 *
-	 * @return The list of generic parameters. This list can be empty.
+	 * For instance, a method for a {@link Tuple2} would look like this:
+	 * <code>
+	 * Map m = new HashMap();
+	 * m.put("T0", this.getTypeAt(0));
+	 * m.put("T1", this.getTypeAt(1));
+	 * return m;
+	 * </code>
+	 *
+	 * @return map of inferred subtypes; it does not have to contain all generic parameters as key;
+	 *         values may be null if type could not be inferred
 	 */
 	@PublicEvolving
-	public List<TypeInformation<?>> getGenericParameters() {
-		// Return an empty list as the default implementation
-		return Collections.emptyList();
+	public Map<String, TypeInformation<?>> getGenericParameters() {
+		// return an empty map as the default implementation
+		return Collections.emptyMap();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index d525ffb..e2cd789 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -169,7 +171,16 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 			);
 		}
 	}
-	
+
+	@Override
+	public Map<String, TypeInformation<?>> getGenericParameters() {
+		Map<String, TypeInformation<?>> m = new HashMap<>(types.length);
+		for (int i = 0; i < types.length; i++) {
+			m.put("T" + i, types[i]);
+		}
+		return m;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a722d72..a0b09f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -28,9 +28,12 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
 
+import java.util.Map;
 import org.apache.avro.specific.SpecificRecordBase;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -56,6 +59,8 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -63,7 +68,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
-
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,6 +116,34 @@ public class TypeExtractor {
 	}
 
 	// --------------------------------------------------------------------------------------------
+	//  TypeInfoFactory registry
+	// --------------------------------------------------------------------------------------------
+
+	private static Map<Type, Class<? extends TypeInfoFactory>> registeredTypeInfoFactories = new HashMap<>();
+
+	/**
+	 * Registers a type information factory globally for a certain type. Every following type extraction
+	 * operation will use the provided factory for this type. The factory will have highest precedence
+	 * for this type. In a hierarchy of types the registered factory has higher precedence than annotations
+	 * at the same level but lower precedence than factories defined down the hierarchy.
+	 *
+	 * @param t type for which a new factory is registered
+	 * @param factory type information factory that will produce {@link TypeInformation}
+	 */
+	private static void registerFactory(Type t, Class<? extends TypeInfoFactory> factory) {
+		Preconditions.checkNotNull(t, "Type parameter must not be null.");
+		Preconditions.checkNotNull(factory, "Factory parameter must not be null.");
+
+		if (!TypeInfoFactory.class.isAssignableFrom(factory)) {
+			throw new IllegalArgumentException("Class is not a TypeInfoFactory.");
+		}
+		if (registeredTypeInfoFactories.containsKey(t)) {
+			throw new InvalidTypesException("A TypeInfoFactory for type '" + t + "' is already registered.");
+		}
+		registeredTypeInfoFactories.put(t, factory);
+	}
+
+	// --------------------------------------------------------------------------------------------
 	//  Function specific methods
 	// --------------------------------------------------------------------------------------------
 
@@ -592,9 +626,14 @@ public class TypeExtractor {
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		
+
+		// check if type information can be created using a type factory
+		final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(t, typeHierarchy, in1Type, in2Type);
+		if (typeFromFactory != null) {
+			return typeFromFactory;
+		}
 		// check if type is a subclass of tuple
-		if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
+		else if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
 			Type curT = t;
 			
 			// do not allow usage of Tuple as type
@@ -622,7 +661,7 @@ public class TypeExtractor {
 			typeHierarchy.add(curT);
 
 			// create the type information for the subtypes
-			TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
 			// type needs to be treated a pojo due to additional fields
 			if (subTypesInfo == null) {
 				if (t instanceof ParameterizedType) {
@@ -655,7 +694,7 @@ public class TypeExtractor {
 			typeHierarchy.add(curT);
 
 			// create the type information for the subtypes
-			TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
 			// type needs to be treated a pojo due to additional fields
 			if (subTypesInfo == null) {
 				if (t instanceof ParameterizedType) {
@@ -807,12 +846,40 @@ public class TypeExtractor {
 
 		return null;
 	}
-	
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
 		TypeInformation<?> info = null;
-		
+
+		// use a factory to find corresponding type information to type variable
+		final ArrayList<Type> factoryHierarchy = new ArrayList<>(inputTypeHierarchy);
+		final TypeInfoFactory<?> factory = getClosestFactory(factoryHierarchy, inType);
+		if (factory != null) {
+			// the type that defines the factory is last in factory hierarchy
+			final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1);
+			// defining type has generics, the factory need to be asked for a mapping of subtypes to type information
+			if (factoryDefiningType instanceof ParameterizedType) {
+				final Type[] typeParams = typeToClass(factoryDefiningType).getTypeParameters();
+				final Type[] actualParams = ((ParameterizedType) factoryDefiningType).getActualTypeArguments();
+				// go thru all elements and search for type variables
+				for (int i = 0; i < actualParams.length; i++) {
+					final Map<String, TypeInformation<?>> componentInfo = inTypeInfo.getGenericParameters();
+					final String typeParamName = typeParams[i].toString();
+					if (!componentInfo.containsKey(typeParamName) || componentInfo.get(typeParamName) == null) {
+						throw new InvalidTypesException("TypeInformation '" + inTypeInfo.getClass().getSimpleName() +
+							"' does not supply a mapping of TypeVariable '" + typeParamName + "' to corresponding TypeInformation. " +
+							"Input type inference can only produce a result with this information. " +
+							"Please implement method 'TypeInformation.getGenericParameters()' for this.");
+					}
+					info = createTypeInfoFromInput(returnTypeVar, factoryHierarchy, actualParams[i], componentInfo.get(typeParamName));
+					if (info != null) {
+						break;
+					}
+				}
+			}
+		}
 		// the input is a type variable
-		if (inType instanceof TypeVariable) {
+		else if (inType instanceof TypeVariable) {
 			inType = materializeTypeVariable(inputTypeHierarchy, (TypeVariable<?>) inType);
 			info = findCorrespondingInfo(returnTypeVar, inType, inTypeInfo, inputTypeHierarchy);
 		}
@@ -873,28 +940,30 @@ public class TypeExtractor {
 	 * @param typeHierarchy necessary for type inference
 	 * @param in1Type necessary for type inference
 	 * @param in2Type necessary for type inference
+	 * @param lenient decides whether exceptions should be thrown if a subtype can not be determined
 	 * @return array containing TypeInformation of sub types or null if definingType contains
 	 *     more subtypes (fields) that defined
 	 */
 	private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
-			ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+			ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, boolean lenient) {
 		Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
 
 		// materialize possible type variables
 		for (int i = 0; i < subtypes.length; i++) {
+			final Type actualTypeArg = definingType.getActualTypeArguments()[i];
 			// materialize immediate TypeVariables
-			if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
-				subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]);
+			if (actualTypeArg instanceof TypeVariable<?>) {
+				subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) actualTypeArg);
 			}
 			// class or parameterized type
 			else {
-				subtypes[i] = definingType.getActualTypeArguments()[i];
+				subtypes[i] = actualTypeArg;
 			}
 		}
 
 		TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
 		for (int i = 0; i < subtypes.length; i++) {
-			ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+			final ArrayList<Type> subTypeHierarchy = new ArrayList<>(typeHierarchy);
 			subTypeHierarchy.add(subtypes[i]);
 			// sub type could not be determined with materializing
 			// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
@@ -902,7 +971,7 @@ public class TypeExtractor {
 				subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
 
 				// variable could not be determined
-				if (subTypesInfo[i] == null) {
+				if (subTypesInfo[i] == null && !lenient) {
 					throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
 							+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
 							+ "' could not be determined. This is most likely a type erasure problem. "
@@ -910,25 +979,75 @@ public class TypeExtractor {
 							+ "all variables in the return type can be deduced from the input type(s).");
 				}
 			} else {
-				subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
+				// create the type information of the subtype or null/exception
+				try {
+					subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
+				} catch (InvalidTypesException e) {
+					if (lenient) {
+						subTypesInfo[i] = null;
+					} else {
+						throw e;
+					}
+				}
 			}
 		}
 
-		Class<?> originalTypeAsClass = null;
-		if (isClassType(originalType)) {
-			originalTypeAsClass = typeToClass(originalType);
+		// check that number of fields matches the number of subtypes
+		if (!lenient) {
+			Class<?> originalTypeAsClass = null;
+			if (isClassType(originalType)) {
+				originalTypeAsClass = typeToClass(originalType);
+			}
+			checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
+			// check if the class we assumed to conform to the defining type so far is actually a pojo because the
+			// original type contains additional fields.
+			// check for additional fields.
+			int fieldCount = countFieldsInClass(originalTypeAsClass);
+			if(fieldCount > subTypesInfo.length) {
+				return null;
+			}
 		}
-		checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
-		// check if the class we assumed to conform to the defining type so far is actually a pojo because the
-		// original type contains additional fields.
-		// check for additional fields.
-		int fieldCount = countFieldsInClass(originalTypeAsClass);
-		if(fieldCount > subTypesInfo.length) {
+
+		return subTypesInfo;
+	}
+
+	/**
+	 * Creates type information using a factory if for this type or super types. Returns null otherwise.
+	 */
+	@SuppressWarnings("unchecked")
+	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoFromFactory(
+			Type t, ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+
+		final ArrayList<Type> factoryHierarchy = new ArrayList<>(typeHierarchy);
+		final TypeInfoFactory<? super OUT> factory = getClosestFactory(factoryHierarchy, t);
+		if (factory == null) {
 			return null;
 		}
-		return subTypesInfo;
+		final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1);
+
+		// infer possible type parameters from input
+		final Map<String, TypeInformation<?>> genericParams;
+		if (factoryDefiningType instanceof ParameterizedType) {
+			genericParams = new HashMap<>();
+			final ParameterizedType paramDefiningType = (ParameterizedType) factoryDefiningType;
+			final Type[] args = typeToClass(paramDefiningType).getTypeParameters();
+
+			final TypeInformation<?>[] subtypeInfo = createSubTypesInfo(t, paramDefiningType, factoryHierarchy, in1Type, in2Type, true);
+			assert subtypeInfo != null;
+			for (int i = 0; i < subtypeInfo.length; i++) {
+				genericParams.put(args[i].toString(), subtypeInfo[i]);
+			}
+		} else {
+			genericParams = Collections.emptyMap();
+		}
+
+		final TypeInformation<OUT> createdTypeInfo = (TypeInformation<OUT>) factory.createTypeInfo(t, genericParams);
+		if (createdTypeInfo == null) {
+			throw new InvalidTypesException("TypeInfoFactory returned invalid TypeInformation 'null'");
+		}
+		return createdTypeInfo;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Extract type parameters
 	// --------------------------------------------------------------------------------------------
@@ -1254,6 +1373,31 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 
 	/**
+	 * Returns the type information factory for a type using the factory registry or annotations.
+	 */
+	@Internal
+	public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
+		final Class<?> factoryClass;
+		if (registeredTypeInfoFactories.containsKey(t)) {
+			factoryClass = registeredTypeInfoFactories.get(t);
+		}
+		else {
+			if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
+				return null;
+			}
+			final TypeInfo typeInfoAnnotation = typeToClass(t).getAnnotation(TypeInfo.class);
+			factoryClass = typeInfoAnnotation.value();
+			// check for valid factory class
+			if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
+				throw new InvalidTypesException("TypeInfo annotation does not specify a valid TypeInfoFactory.");
+			}
+		}
+
+		// instantiate
+		return (TypeInfoFactory<OUT>) InstantiationUtil.instantiate(factoryClass);
+	}
+
+	/**
 	 * @return number of items with equal type or same raw type
 	 */
 	private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
@@ -1265,27 +1409,46 @@ public class TypeExtractor {
 		}
 		return count;
 	}
-	
+
 	/**
-	 * @param curT : start type
-	 * @return Type The immediate child of the top class
+	 * Traverses the type hierarchy of a type up until a certain stop class is found.
+	 *
+	 * @param t type for which a hierarchy need to be created
+	 * @return type of the immediate child of the stop class
 	 */
-	private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
-		// skip first one
-		if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
-			curT = typeToClass(curT).getGenericSuperclass();
+	private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, Class<?> stopAtClass) {
+		while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) {
+			typeHierarchy.add(t);
+			t = typeToClass(t).getGenericSuperclass();
+
+			if (t == null) {
+				break;
+			}
 		}
-		while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
-			typeHierarchy.add(curT);
-			curT = typeToClass(curT).getGenericSuperclass();
+		return t;
+	}
 
-			if (curT == null) {
+	/**
+	 * Traverses the type hierarchy up until a type information factory can be found.
+	 *
+	 * @param typeHierarchy hierarchy to be filled while traversing up
+	 * @param t type for which a factory needs to be found
+	 * @return closest type information factory or null if there is no factory in the type hierarchy
+	 */
+	private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
+		TypeInfoFactory factory = null;
+		while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
+			typeHierarchy.add(t);
+			factory = getTypeInfoFactory(t);
+			t = typeToClass(t).getGenericSuperclass();
+
+			if (t == null) {
 				break;
 			}
 		}
-		return curT;
+		return factory;
 	}
-	
+
 	private int countFieldsInClass(Class<?> clazz) {
 		int fieldCount = 0;
 		for(Field field : clazz.getFields()) { // get all fields
@@ -1486,17 +1649,26 @@ public class TypeExtractor {
 	 * @return TypeInformation that describes the passed Class
 	 */
 	public static <X> TypeInformation<X> getForClass(Class<X> clazz) {
-		return new TypeExtractor().privateGetForClass(clazz, new ArrayList<Type>());
+		final ArrayList<Type> typeHierarchy = new ArrayList<>();
+		typeHierarchy.add(clazz);
+		return new TypeExtractor().privateGetForClass(clazz, typeHierarchy);
 	}
 	
 	private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
 		return privateGetForClass(clazz, typeHierarchy, null, null, null);
 	}
+
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 		checkNotNull(clazz);
 
+		// check if type information can be produced using a factory
+		final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
+		if (typeFromFactory != null) {
+			return typeFromFactory;
+		}
+
 		// Object is handled as generic type info
 		if (clazz.equals(Object.class)) {
 			return new GenericTypeInfo<>(clazz);
@@ -1859,6 +2031,14 @@ public class TypeExtractor {
 	private <X> TypeInformation<X> privateGetForObject(X value) {
 		checkNotNull(value);
 
+		// check if type information can be produced using a factory
+		final ArrayList<Type> typeHierarchy = new ArrayList<>();
+		typeHierarchy.add(value.getClass());
+		final TypeInformation<X> typeFromFactory = createTypeInfoFromFactory(value.getClass(), typeHierarchy, null, null);
+		if (typeFromFactory != null) {
+			return typeFromFactory;
+		}
+
 		// check if we can extract the types from tuples, otherwise work with the class
 		if (value instanceof Tuple) {
 			Tuple t = (Tuple) value;

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
new file mode 100644
index 0000000..f055879
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for extracting {@link org.apache.flink.api.common.typeinfo.TypeInformation} from types
+ * using a {@link org.apache.flink.api.common.typeinfo.TypeInfoFactory}
+ */
+public class TypeInfoFactoryTest {
+
+	@Test
+	public void testSimpleType() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(IntLike.class);
+		assertEquals(INT_TYPE_INFO, ti);
+
+		ti = TypeExtractor.getForClass(IntLike.class);
+		assertEquals(INT_TYPE_INFO, ti);
+
+		ti = TypeExtractor.getForObject(new IntLike());
+		assertEquals(INT_TYPE_INFO, ti);
+	}
+
+	@Test
+	public void testMyEitherGenericType() {
+		MapFunction<Boolean, MyEither<Boolean, String>> f = new MyEitherMapper<>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BOOLEAN_TYPE_INFO);
+		assertTrue(ti instanceof EitherTypeInfo);
+		EitherTypeInfo eti = (EitherTypeInfo) ti;
+		assertEquals(BOOLEAN_TYPE_INFO, eti.getLeftType());
+		assertEquals(STRING_TYPE_INFO, eti.getRightType());
+	}
+
+	@Test
+	public void testMyOptionGenericType() {
+		TypeInformation<MyOption<Tuple2<Boolean, String>>> inTypeInfo = new MyOptionTypeInfo<>(
+			new TupleTypeInfo<Tuple2<Boolean, String>>(BOOLEAN_TYPE_INFO, STRING_TYPE_INFO));
+		MapFunction<MyOption<Tuple2<Boolean, String>>, MyOption<Tuple2<Boolean, Boolean>>> f = new MyOptionMapper<>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, inTypeInfo);
+		assertTrue(ti instanceof MyOptionTypeInfo);
+		MyOptionTypeInfo oti = (MyOptionTypeInfo) ti;
+		assertTrue(oti.getInnerType() instanceof TupleTypeInfo);
+		TupleTypeInfo tti = (TupleTypeInfo) oti.getInnerType();
+		assertEquals(BOOLEAN_TYPE_INFO, tti.getTypeAt(0));
+		assertEquals(BOOLEAN_TYPE_INFO, tti.getTypeAt(1));
+	}
+
+	@Test
+	public void testMyTuple() {
+		TypeInformation<Tuple1<MyTuple<Double, String>>> inTypeInfo = new TupleTypeInfo<>(
+			new MyTupleTypeInfo(DOUBLE_TYPE_INFO, STRING_TYPE_INFO));
+		MapFunction<Tuple1<MyTuple<Double, String>>, Tuple1<MyTuple<Boolean, Double>>> f = new MyTupleMapperL2<>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, inTypeInfo);
+		assertTrue(ti instanceof TupleTypeInfo);
+		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+		assertTrue(tti.getTypeAt(0) instanceof MyTupleTypeInfo);
+		MyTupleTypeInfo mtti = (MyTupleTypeInfo) tti.getTypeAt(0);
+		assertEquals(BOOLEAN_TYPE_INFO, mtti.getField0());
+		assertEquals(DOUBLE_TYPE_INFO, mtti.getField1());
+	}
+
+	@Test
+	public void testMyTupleHierarchy() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(MyTuple2.class);
+		assertTrue(ti instanceof MyTupleTypeInfo);
+		MyTupleTypeInfo<?, ?> mtti = (MyTupleTypeInfo) ti;
+		assertEquals(STRING_TYPE_INFO, mtti.getField0());
+		assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
+	}
+
+	@Test
+	public void testMyTupleHierarchyWithInference() {
+		TypeInformation<Tuple1<MyTuple3<Tuple1<Float>>>> inTypeInfo = new TupleTypeInfo<>(new MyTupleTypeInfo<>(
+			new TupleTypeInfo<Tuple1<Float>>(FLOAT_TYPE_INFO), BOOLEAN_TYPE_INFO));
+		MapFunction<Tuple1<MyTuple3<Tuple1<Float>>>, Tuple1<MyTuple3<Tuple2<Float, String>>>> f = new MyTuple3Mapper<>();
+		TypeInformation ti = TypeExtractor.getMapReturnTypes(f, inTypeInfo);
+		assertTrue(ti instanceof TupleTypeInfo);
+		TupleTypeInfo<?> tti = (TupleTypeInfo) ti;
+		assertTrue(tti.getTypeAt(0) instanceof MyTupleTypeInfo);
+		MyTupleTypeInfo mtti = (MyTupleTypeInfo) tti.getTypeAt(0);
+		assertEquals(new TupleTypeInfo<>(FLOAT_TYPE_INFO, STRING_TYPE_INFO), mtti.getField0());
+		assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1());
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void testMissingTypeInfo() {
+		MapFunction f = new MyFaultyMapper();
+		TypeExtractor.getMapReturnTypes(f, INT_TYPE_INFO);
+	}
+
+	@Test(expected = InvalidTypesException.class)
+	public void testMissingTypeInference() {
+		MapFunction f = new MyFaultyMapper2();
+		TypeExtractor.getMapReturnTypes(f, new MyFaultyTypeInfo());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Utilities
+	// --------------------------------------------------------------------------------------------
+
+	public static class MyTuple3Mapper<Y> implements MapFunction<Tuple1<MyTuple3<Tuple1<Y>>>, Tuple1<MyTuple3<Tuple2<Y, String>>>> {
+		@Override
+		public Tuple1<MyTuple3<Tuple2<Y, String>>> map(Tuple1<MyTuple3<Tuple1<Y>>> value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MyTuple3<T> extends MyTuple<T, Boolean> {
+		// empty
+	}
+
+	public static class MyTuple2 extends MyTuple<String, Boolean> {
+		// empty
+	}
+
+	public static class MyFaultyMapper2<T> implements MapFunction<MyFaulty<T>, MyFaulty<T>> {
+		@Override
+		public MyFaulty<T> map(MyFaulty<T> value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MyFaultyMapper<T> implements MapFunction<T, MyFaulty<T>> {
+		@Override
+		public MyFaulty<T> map(T value) throws Exception {
+			return null;
+		}
+	}
+
+	@TypeInfo(FaultyTypeInfoFactory.class)
+	public static class MyFaulty<Y> {
+		// empty
+	}
+
+	public static class FaultyTypeInfoFactory extends TypeInfoFactory {
+		@Override
+		public TypeInformation createTypeInfo(Type t, Map genericParameters) {
+			return null;
+		}
+	}
+
+	public static class MyFaultyTypeInfo extends TypeInformation<MyFaulty> {
+		@Override
+		public boolean isBasicType() {
+			return false;
+		}
+
+		@Override
+		public boolean isTupleType() {
+			return false;
+		}
+
+		@Override
+		public int getArity() {
+			return 0;
+		}
+
+		@Override
+		public int getTotalFields() {
+			return 0;
+		}
+
+		@Override
+		public Class<MyFaulty> getTypeClass() {
+			return null;
+		}
+
+		@Override
+		public boolean isKeyType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<MyFaulty> createSerializer(ExecutionConfig config) {
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return false;
+		}
+	}
+
+	public static class MyTupleMapperL1<A, B> implements MapFunction<Tuple1<MyTuple<A, String>>, Tuple1<MyTuple<B, A>>> {
+		@Override
+		public Tuple1<MyTuple<B, A>> map(Tuple1<MyTuple<A, String>> value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MyTupleMapperL2<C> extends MyTupleMapperL1<C, Boolean> {
+		// empty
+	}
+
+	@TypeInfo(MyTupleTypeInfoFactory.class)
+	public static class MyTuple<T0, T1> {
+		// empty
+	}
+
+	public static class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
+		@Override
+		@SuppressWarnings("unchecked")
+		public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
+			return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
+		}
+	}
+
+	public static class MyTupleTypeInfo<T0, T1> extends TypeInformation<MyTuple<T0, T1>> {
+		private TypeInformation field0;
+		private TypeInformation field1;
+
+		public TypeInformation getField0() {
+			return field0;
+		}
+
+		public TypeInformation getField1() {
+			return field1;
+		}
+
+		public MyTupleTypeInfo(TypeInformation field0, TypeInformation field1) {
+			this.field0 = field0;
+			this.field1 = field1;
+		}
+
+		@Override
+		public boolean isBasicType() {
+			return false;
+		}
+
+		@Override
+		public boolean isTupleType() {
+			return false;
+		}
+
+		@Override
+		public int getArity() {
+			return 0;
+		}
+
+		@Override
+		public int getTotalFields() {
+			return 0;
+		}
+
+		@Override
+		public Class<MyTuple<T0, T1>> getTypeClass() {
+			return null;
+		}
+
+		@Override
+		public boolean isKeyType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<MyTuple<T0, T1>> createSerializer(ExecutionConfig config) {
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return false;
+		}
+
+		@Override
+		public Map<String, TypeInformation<?>> getGenericParameters() {
+			Map<String, TypeInformation<?>> map = new HashMap<>(2);
+			map.put("T0", field0);
+			map.put("T1", field1);
+			return map;
+		}
+	}
+
+	public static class MyOptionMapper<T> implements MapFunction<MyOption<Tuple2<T, String>>, MyOption<Tuple2<T, T>>> {
+		@Override
+		public MyOption<Tuple2<T, T>> map(MyOption<Tuple2<T, String>> value) throws Exception {
+			return null;
+		}
+	}
+
+	@TypeInfo(MyOptionTypeInfoFactory.class)
+	public static class MyOption<T> {
+		// empty
+	}
+
+	public static class MyOptionTypeInfoFactory<T> extends TypeInfoFactory<MyOption<T>> {
+		@Override
+		@SuppressWarnings("unchecked")
+		public TypeInformation<MyOption<T>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParams) {
+			return new MyOptionTypeInfo(genericParams.get("T"));
+		}
+	}
+
+	public static class MyOptionTypeInfo<T> extends TypeInformation<MyOption<T>> {
+
+		private final TypeInformation<T> innerType;
+
+		public MyOptionTypeInfo(TypeInformation<T> innerType) {
+			this.innerType = innerType;
+		}
+
+		public TypeInformation<T> getInnerType() {
+			return innerType;
+		}
+
+		@Override
+		public boolean isBasicType() {
+			return false;
+		}
+
+		@Override
+		public boolean isTupleType() {
+			return false;
+		}
+
+		@Override
+		public int getArity() {
+			return 0;
+		}
+
+		@Override
+		public int getTotalFields() {
+			return 0;
+		}
+
+		@Override
+		public Class<MyOption<T>> getTypeClass() {
+			return null;
+		}
+
+		@Override
+		public boolean isKeyType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<MyOption<T>> createSerializer(ExecutionConfig config) {
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return 0;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return false;
+		}
+
+		@Override
+		public Map<String, TypeInformation<?>> getGenericParameters() {
+			Map<String, TypeInformation<?>> map = new HashMap<>(1);
+			map.put("T", innerType);
+			return map;
+		}
+	}
+
+	public static class MyEitherMapper<T> implements MapFunction<T, MyEither<T, String>> {
+		@Override
+		public MyEither<T, String> map(T value) throws Exception {
+			return null;
+		}
+	}
+
+	@TypeInfo(MyEitherTypeInfoFactory.class)
+	public static class MyEither<A, B> {
+		// empty
+	}
+
+	public static class MyEitherTypeInfoFactory<A, B> extends TypeInfoFactory<MyEither<A, B>> {
+		@Override
+		@SuppressWarnings("unchecked")
+		public TypeInformation<MyEither<A,B>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParams) {
+			return new EitherTypeInfo(genericParams.get("A"), genericParams.get("B"));
+		}
+	}
+
+	@TypeInfo(IntLikeTypeInfoFactory.class)
+	public static class IntLike {
+		// empty
+	}
+
+	public static class IntLikeTypeInfoFactory extends TypeInfoFactory<IntLike> {
+		@Override
+		@SuppressWarnings("unchecked")
+		public TypeInformation<IntLike> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParams) {
+			return (TypeInformation) INT_TYPE_INFO;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index a8587ef..11d5ec7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -51,6 +51,9 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
 
           case TypeParameter() => TypeParameterDescriptor(id, tpe)
 
+          // type or super type defines type information factory
+          case FactoryType(baseType) => analyzeFactoryType(id, tpe, baseType)
+
           case PrimitiveType(default, wrapper) => PrimitiveDescriptor(id, tpe, default, wrapper)
 
           case BoxedPrimitiveType(default, wrapper, box, unbox) =>
@@ -91,6 +94,19 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
       }
     }
 
+    private def analyzeFactoryType(
+        id: Int,
+        tpe: Type,
+        baseType: Type): UDTDescriptor = {
+      val params: Seq[UDTDescriptor] = baseType match {
+        case TypeRef(_, _, args) =>
+          args.map(analyze)
+        case _ =>
+          Seq[UDTDescriptor]()
+      }
+      FactoryTypeDescriptor(id, tpe, baseType, params)
+    }
+
     private def analyzeArray(
         id: Int,
         tpe: Type,
@@ -438,6 +454,15 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
       def unapply(tpe: Type): Boolean = tpe <:< typeOf[org.apache.flink.api.java.tuple.Tuple]
     }
 
+    private object FactoryType {
+      def unapply(tpe: Type): Option[Type] = {
+        val definingType = tpe.typeSymbol.asClass.baseClasses find {
+          _.annotations.exists(_.tpe =:= typeOf[org.apache.flink.api.common.typeinfo.TypeInfo])
+        }
+        definingType.map(tpe.baseType)
+      }
+    }
+
     private class UDTAnalyzerCache {
 
       private val caches = new DynamicVariable[Map[Type, RecursiveDescriptor]](Map())

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
index 4efa546..9efde0f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeDescriptors.scala
@@ -19,6 +19,7 @@ package org.apache.flink.api.scala.codegen
 
 import org.apache.flink.annotation.Internal
 
+import scala.collection.Map
 import scala.language.postfixOps
 import scala.reflect.macros.Context
 
@@ -53,6 +54,13 @@ private[flink] trait TypeDescriptors[C <: Context] { this: MacroContextHolder[C]
 
   case class TryDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends UDTDescriptor
 
+  case class FactoryTypeDescriptor(
+      id: Int,
+      tpe: Type,
+      baseType: Type,
+      params: Seq[UDTDescriptor])
+    extends UDTDescriptor
+
   case class OptionDescriptor(id: Int, tpe: Type, elem: UDTDescriptor) extends UDTDescriptor
 
   case class BoxedPrimitiveDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index ee0d167..9736e81 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -52,6 +52,9 @@ private[flink] trait TypeInformationGen[C <: Context] {
   // We have this for internal use so that we can use it to recursively generate a tree of
   // TypeInformation from a tree of UDTDescriptor
   def mkTypeInfo[T: c.WeakTypeTag](desc: UDTDescriptor): c.Expr[TypeInformation[T]] = desc match {
+
+    case f: FactoryTypeDescriptor => mkTypeInfoFromFactory(f)
+
     case cc@CaseClassDescriptor(_, tpe, _, _, _) =>
       mkCaseClassTypeInfo(cc)(c.WeakTypeTag(tpe).asInstanceOf[c.WeakTypeTag[Product]])
         .asInstanceOf[c.Expr[TypeInformation[T]]]
@@ -93,6 +96,25 @@ private[flink] trait TypeInformationGen[C <: Context] {
     case d => mkGenericTypeInfo(d)
   }
 
+  def mkTypeInfoFromFactory[T: c.WeakTypeTag](desc: FactoryTypeDescriptor)
+    : c.Expr[TypeInformation[T]] = {
+
+    val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))
+    val baseClazz = c.Expr[Class[T]](Literal(Constant(desc.baseType)))
+
+    val typeInfos = desc.params map { p => mkTypeInfo(p)(c.WeakTypeTag(p.tpe)).tree }
+    val typeInfosList = c.Expr[List[TypeInformation[_]]](mkList(typeInfos.toList))
+
+    reify {
+      val factory = TypeExtractor.getTypeInfoFactory[T](baseClazz.splice)
+      val genericParameters = typeInfosList.splice
+        .zip(baseClazz.splice.getTypeParameters).map { case (typeInfo, typeParam) =>
+          typeParam.getName -> typeInfo
+        }.toMap[String, TypeInformation[_]]
+      factory.createTypeInfo(tpeClazz.splice, genericParameters.asJava)
+    }
+  }
+
   def mkCaseClassTypeInfo[T <: Product : c.WeakTypeTag](
       desc: CaseClassDescriptor): c.Expr[TypeInformation[T]] = {
     val tpeClazz = c.Expr[Class[T]](Literal(Constant(desc.tpe)))

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index d658fde..2aecd7a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -47,8 +47,10 @@ abstract class CaseClassTypeInfo[T <: Product](
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
 
   @PublicEvolving
-  override def getGenericParameters: java.util.List[TypeInformation[_]] = {
-    typeParamTypeInfos.toList.asJava
+  override def getGenericParameters: java.util.Map[String, TypeInformation[_]] = {
+    typeParamTypeInfos.zipWithIndex.map { case (info, index) =>
+      "T" + (index + 1) -> info
+    }.toMap[String, TypeInformation[_]].asJava
   }
 
   private val REGEX_INT_FIELD: String = "[0-9]+"

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index 406f073..e897309 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -47,7 +47,8 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
   @PublicEvolving
   override def getTypeClass = clazz
   @PublicEvolving
-  override def getGenericParameters = List[TypeInformation[_]](leftTypeInfo, rightTypeInfo).asJava
+  override def getGenericParameters =
+    Map[String, TypeInformation[_]]("A" -> leftTypeInfo, "B" -> rightTypeInfo).asJava
 
   @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index 92d2704..efc6427 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -46,7 +46,7 @@ class EnumValueTypeInfo[E <: Enumeration](val enum: E, val clazz: Class[E#Value]
   @PublicEvolving
   override def getTypeClass = clazz
   @PublicEvolving
-  override def getGenericParameters = List.empty[TypeInformation[_]].asJava
+  override def getGenericParameters = Map.empty[String, TypeInformation[_]].asJava
 
 
   @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 58ae77c..73fe580 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -44,7 +44,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio
   @PublicEvolving
   override def getTypeClass = classOf[Option[_]].asInstanceOf[Class[T]]
   @PublicEvolving
-  override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
+  override def getGenericParameters = Map[String, TypeInformation[_]]("A" -> elemTypeInfo).asJava
 
   @PublicEvolving
   override def createComparator(ascending: Boolean, executionConfig: ExecutionConfig) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 82fd8ae..47fb039 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -46,7 +46,8 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
   @PublicEvolving
   override def getTypeClass: Class[T] = clazz
   @PublicEvolving
-  override def getGenericParameters = List[TypeInformation[_]](elementTypeInfo).asJava
+  override def getGenericParameters =
+    Map[String, TypeInformation[_]]("A" -> elementTypeInfo).asJava
 
   @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 0a5a06d..b09c353 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -46,7 +46,7 @@ class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: TypeInformation[A])
   @PublicEvolving
   override def getTypeClass = classOf[Try[_]].asInstanceOf[Class[T]]
   @PublicEvolving
-  override def getGenericParameters = List[TypeInformation[_]](elemTypeInfo).asJava
+  override def getGenericParameters = Map[String, TypeInformation[_]]("T" -> elemTypeInfo).asJava
 
   @PublicEvolving
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc38fd3/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
new file mode 100644
index 0000000..5873630
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TypeInfoFactoryTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import java.lang.reflect.Type
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInfoFactory, TypeInformation}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TypeInfoFactoryTest._
+import org.apache.flink.api.java.typeutils.{EitherTypeInfo => JavaEitherTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.TypeInfoFactoryTest._
+import org.apache.flink.util.TestLogger
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class TypeInfoFactoryTest  extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testSimpleType(): Unit = {
+    val ti = createTypeInformation[ScalaIntLike]
+    assertEquals(INT_TYPE_INFO, ti)
+  }
+
+  @Test
+  def testMyTuple(): Unit = {
+    val ti = createTypeInformation[MyTuple[Double, String]]
+    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
+    assertEquals(STRING_TYPE_INFO, mtti.getField1)
+  }
+
+  @Test
+  def testMyTupleHierarchy() {
+    val ti = createTypeInformation[MyTuple2]
+    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(STRING_TYPE_INFO, mtti.getField0)
+    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+
+    val ti2 = createTypeInformation[MyScalaTupleClass]
+    assertTrue(ti2.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti2 = ti2.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(STRING_TYPE_INFO, mtti.getField0)
+    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+  }
+
+  @Test
+  def testMyTupleHierarchyWithCaseClass(): Unit = {
+    val ti = createTypeInformation[MyScalaTupleCaseClass]
+    assertTrue(ti.isInstanceOf[MyTupleTypeInfo[_, _]])
+    val mtti = ti.asInstanceOf[MyTupleTypeInfo[_, _]]
+    assertEquals(DOUBLE_TYPE_INFO, mtti.getField0)
+    assertEquals(BOOLEAN_TYPE_INFO, mtti.getField1)
+  }
+
+  @Test
+  def testMyEitherGenericType(): Unit = {
+    val ti = createTypeInformation[MyScalaEither[String, (Double, Int)]]
+    assertTrue(ti.isInstanceOf[JavaEitherTypeInfo[_, _]])
+    val eti = ti.asInstanceOf[JavaEitherTypeInfo[_, _]]
+    assertEquals(STRING_TYPE_INFO, eti.getLeftType)
+    assertTrue(eti.getRightType.isInstanceOf[CaseClassTypeInfo[_]])
+    val cti = eti.getRightType.asInstanceOf[CaseClassTypeInfo[_]]
+    assertEquals(DOUBLE_TYPE_INFO, cti.getTypeAt(0))
+    assertEquals(INT_TYPE_INFO, cti.getTypeAt(1))
+  }
+
+  @Test
+  def testScalaFactory(): Unit = {
+    val ti = createTypeInformation[MyScalaOption[Double]]
+    assertTrue(ti.isInstanceOf[MyScalaOptionTypeInfo])
+    val moti = ti.asInstanceOf[MyScalaOptionTypeInfo]
+    assertEquals(DOUBLE_TYPE_INFO, moti.elementType)
+  }
+}
+
+// --------------------------------------------------------------------------------------------
+//  Utilities
+// --------------------------------------------------------------------------------------------
+
+object TypeInfoFactoryTest {
+
+  @TypeInfo(classOf[IntLikeTypeInfoFactory])
+  case class ScalaIntLike(myint: Int)
+
+  class MyScalaTupleClass extends MyTuple2
+
+  case class MyScalaTupleCaseClass(additional: Boolean) extends MyTuple3[Double]
+
+  @TypeInfo(classOf[MyEitherTypeInfoFactory[_, _]])
+  class MyScalaEither[A, B] {
+    // do nothing here
+  }
+
+  @TypeInfo(classOf[MyScalaOptionTypeInfoFactory])
+  class MyScalaOption[Z] {
+    // do nothing here
+  }
+
+  class MyScalaOptionTypeInfoFactory extends TypeInfoFactory[MyOption[_]] {
+
+    override def createTypeInfo(
+        t: Type,
+        genericParameters: util.Map[String, TypeInformation[_]])
+      : TypeInformation[MyOption[_]] = {
+      new MyScalaOptionTypeInfo(genericParameters.get("Z"))
+    }
+  }
+
+  class MyScalaOptionTypeInfo(val elementType: TypeInformation[_])
+    extends TypeInformation[MyOption[_]] {
+    
+    override def isBasicType: Boolean = ???
+
+    override def isTupleType: Boolean = ???
+
+    override def getArity: Int = ???
+
+    override def getTotalFields: Int = ???
+
+    override def getTypeClass: Class[MyOption[_]] = ???
+
+    override def isKeyType: Boolean = ???
+
+    override def createSerializer(config: ExecutionConfig): TypeSerializer[MyOption[_]] = ???
+
+    override def canEqual(obj: scala.Any): Boolean = ???
+
+    override def hashCode(): Int = ???
+
+    override def toString: String = ???
+
+    override def equals(obj: scala.Any): Boolean = ???
+  }
+}