You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/10 14:25:44 UTC

flink git commit: [FLINK-4673] [core] TypeInfoFactory for Either type

Repository: flink
Updated Branches:
  refs/heads/master f11447e58 -> d4d7cc326


[FLINK-4673] [core] TypeInfoFactory for Either type

Removes from TypeExtractor the explicit parsing for Either and adds an
EitherTypeInfoFactory.

This closes #2545.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4d7cc32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4d7cc32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4d7cc32

Branch: refs/heads/master
Commit: d4d7cc32667016d66c65a7d64601cabd101a0c4d
Parents: f11447e
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Sep 23 14:59:36 2016 -0400
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 10 14:52:38 2017 +0100

----------------------------------------------------------------------
 .../api/java/typeutils/EitherTypeInfo.java      |  20 +++-
 .../java/typeutils/EitherTypeInfoFactory.java   |  48 ++++++++
 .../flink/api/java/typeutils/TypeExtractor.java | 120 ++++---------------
 .../java/org/apache/flink/types/Either.java     |   3 +
 4 files changed, 91 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index 058de12..d4d1aa9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -18,14 +18,19 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.types.Either;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TypeInformation} for the {@link Either} type of the Java API.
  *
@@ -43,8 +48,8 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
 
 	@PublicEvolving
 	public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
-		this.leftType = leftType;
-		this.rightType = rightType;
+		this.leftType = checkNotNull(leftType);
+		this.rightType = checkNotNull(rightType);
 	}
 
 	@Override
@@ -80,6 +85,15 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
 
 	@Override
 	@PublicEvolving
+	public Map<String, TypeInformation<?>> getGenericParameters() {
+		Map<String, TypeInformation<?>> m = new HashMap<>();
+		m.put("L", this.leftType);
+		m.put("R", this.rightType);
+		return m;
+	}
+
+	@Override
+	@PublicEvolving
 	public boolean isKeyType() {
 		return false;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
new file mode 100644
index 0000000..be881a7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Either;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class EitherTypeInfoFactory<L, R> extends TypeInfoFactory<Either<L, R>> {
+
+	@Override
+	public TypeInformation<Either<L, R>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
+		TypeInformation<?> leftType = genericParameters.get("L");
+		TypeInformation<?> rightType = genericParameters.get("R");
+
+		if (leftType == null) {
+			throw new InvalidTypesException("Type extraction is not possible on Either" +
+				" type as it does not contain information about the 'left' type.");
+		}
+
+		if (rightType == null) {
+			throw new InvalidTypesException("Type extraction is not possible on Either" +
+				" type as it does not contain information about the 'right' type.");
+		}
+
+		return new EitherTypeInfo(leftType, rightType);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 08f8c53..df4a0e0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,28 +18,11 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.List;
-
-import java.util.Map;
 import org.apache.avro.specific.SpecificRecordBase;
-
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -66,19 +49,34 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
-import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
 
 /**
  * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
@@ -690,38 +688,6 @@ public class TypeExtractor {
 			return new TupleTypeInfo(typeToClass(t), subTypesInfo);
 			
 		}
-		// check if type is a subclass of Either
-		else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) {
-			Type curT = t;
-
-			// go up the hierarchy until we reach Either (with or without generics)
-			// collect the types while moving up for a later top-down
-			while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) {
-				typeHierarchy.add(curT);
-				curT = typeToClass(curT).getGenericSuperclass();
-			}
-
-			// check if Either has generics
-			if (curT instanceof Class<?>) {
-				throw new InvalidTypesException("Either needs to be parameterized by using generics.");
-			}
-
-			typeHierarchy.add(curT);
-
-			// create the type information for the subtypes
-			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
-			// type needs to be treated a pojo due to additional fields
-			if (subTypesInfo == null) {
-				if (t instanceof ParameterizedType) {
-					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
-				}
-				else {
-					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
-				}
-			}
-			// return either info
-			return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
-		}
 		// type depends on another type
 		// e.g. class MyMapper<E> extends MapFunction<String, E>
 		else if (t instanceof TypeVariable) {
@@ -947,7 +913,7 @@ public class TypeExtractor {
 
 	/**
 	 * Creates the TypeInformation for all elements of a type that expects a certain number of
-	 * subtypes (e.g. TupleXX or Either).
+	 * subtypes (e.g. TupleXX).
 	 *
 	 * @param originalType most concrete subclass
 	 * @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
@@ -1234,29 +1200,6 @@ public class TypeExtractor {
 					validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
 				}
 			}
-			// check for Either
-			else if (typeInfo instanceof EitherTypeInfo) {
-				// check if Either at all
-				if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) {
-					throw new InvalidTypesException("Either type expected.");
-				}
-
-				// go up the hierarchy until we reach Either (with or without generics)
-				while (!(isClassType(type) && typeToClass(type).equals(Either.class))) {
-					typeHierarchy.add(type);
-					type = typeToClass(type).getGenericSuperclass();
-				}
-
-				// check if Either has generics
-				if (type instanceof Class<?>) {
-					throw new InvalidTypesException("Parameterized Either type expected.");
-				}
-
-				EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo;
-				Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
-				validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
-				validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
-			}
 			// check for primitive array
 			else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
 				Type component;
@@ -1675,11 +1618,6 @@ public class TypeExtractor {
 			throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
 		}
 
-		// check for subclasses of Either
-		if (Either.class.isAssignableFrom(clazz)) {
-			throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class.");
-		}
-
 		// check for Enums
 		if(Enum.class.isAssignableFrom(clazz)) {
 			return new EnumTypeInfo(clazz);
@@ -1956,18 +1894,6 @@ public class TypeExtractor {
 			}
 			return new TupleTypeInfo(value.getClass(), infos);
 		}
-		// we can not extract the types from an Either object since it only contains type information
-		// of one type, but from Either classes
-		else if (value instanceof Either) {
-			try {
-				return (TypeInformation<X>) privateCreateTypeInfo(value.getClass());
-			}
-			catch (InvalidTypesException e) {
-				throw new InvalidTypesException("Automatic type extraction is not possible on an Either type "
-						+ "as it does not contain information about both possible types. "
-						+ "Please specify the types directly.");
-			}
-		}
 		else {
 			return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/types/Either.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java
index d61b228..a08e968 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Either.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -19,6 +19,8 @@
 package org.apache.flink.types;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;
 
 /**
  * This type represents a value of one two possible types, Left or Right (a
@@ -30,6 +32,7 @@ import org.apache.flink.annotation.Public;
  *            the type of Right
  */
 @Public
+@TypeInfo(EitherTypeInfoFactory.class)
 public abstract class Either<L, R> {
 
 	/**