You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2015/11/27 14:59:39 UTC

flink git commit: [FLINK-3046] Integrate the Either Java type with the TypeExtractor

Repository: flink
Updated Branches:
  refs/heads/master e69d14521 -> f73a12e72


[FLINK-3046] Integrate the Either Java type with the TypeExtractor

This closes #1393.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f73a12e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f73a12e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f73a12e7

Branch: refs/heads/master
Commit: f73a12e728431d87065f7d0082380a527df39e44
Parents: e69d145
Author: twalthr <tw...@apache.org>
Authored: Mon Nov 23 15:08:47 2015 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Nov 27 14:58:13 2015 +0100

----------------------------------------------------------------------
 .../api/java/typeutils/EitherTypeInfo.java      |  12 +-
 .../flink/api/java/typeutils/TypeExtractor.java | 196 ++++++++++++++-----
 .../java/type/extractor/TypeExtractorTest.java  |  81 +++++++-
 3 files changed, 235 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index 40ed0c0..ec7be97 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -37,7 +37,7 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
 
 	private final TypeInformation<R> rightType;
 
-	public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> rightType) {
+	public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
 		this.leftType = leftType;
 		this.rightType = rightType;
 	}
@@ -108,4 +108,14 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
 		return obj instanceof EitherTypeInfo;
 	}
 
+	// --------------------------------------------------------------------------------------------
+
+	public TypeInformation<L> getLeftType() {
+		return leftType;
+	}
+
+	public TypeInformation<R> getRightType() {
+		return rightType;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 782d58d..ff6a82c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -420,65 +420,53 @@ public class TypeExtractor {
 			}
 			
 			typeHierarchy.add(curT);
-			
-			ParameterizedType tupleChild = (ParameterizedType) curT;
-			
-			Type[] subtypes = new Type[tupleChild.getActualTypeArguments().length];
-			
-			// materialize possible type variables
-			for (int i = 0; i < subtypes.length; i++) {
-				// materialize immediate TypeVariables
-				if (tupleChild.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
-					subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) tupleChild.getActualTypeArguments()[i]);
+
+			// create the type information for the subtypes
+			TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+			// type needs to be treated a pojo due to additional fields
+			if (subTypesInfo == null) {
+				if (t instanceof ParameterizedType) {
+					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
 				}
-				// class or parameterized type
 				else {
-					subtypes[i] = tupleChild.getActualTypeArguments()[i];
+					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
 				}
 			}
+			// return tuple info
+			return new TupleTypeInfo(typeToClass(t), subTypesInfo);
 			
-			TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
-			for (int i = 0; i < subtypes.length; i++) {
-				ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
-				subTypeHierarchy.add(subtypes[i]);
-				// sub type could not be determined with materializing
-				// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
-				if (subtypes[i] instanceof TypeVariable<?>) {
-					tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
-					
-					// variable could not be determined
-					if (tupleSubTypes[i] == null) {
-						throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
-								+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
-								+ "' could not be determined. This is most likely a type erasure problem. "
-								+ "The type extraction currently supports types with generic variables only in cases where "
-								+ "all variables in the return type can be deduced from the input type(s).");
-					}
-				} else {
-					tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
-				}
+		}
+		// check if type is a subclass of Either
+		else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) {
+			Type curT = t;
+
+			// go up the hierarchy until we reach Either (with or without generics)
+			// collect the types while moving up for a later top-down
+			while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) {
+				typeHierarchy.add(curT);
+				curT = typeToClass(curT).getGenericSuperclass();
 			}
-			
-			Class<?> tAsClass = null;
-			if (isClassType(t)) {
-				tAsClass = typeToClass(t);
-			}
-			Preconditions.checkNotNull(tAsClass, "t has a unexpected type");
-			// check if the class we assumed to be a Tuple so far is actually a pojo because it contains additional fields.
-			// check for additional fields.
-			int fieldCount = countFieldsInClass(tAsClass);
-			if(fieldCount != tupleSubTypes.length) {
-				// the class is not a real tuple because it contains additional fields. treat as a pojo
+
+			// check if Either has generics
+			if (curT instanceof Class<?>) {
+				throw new InvalidTypesException("Either needs to be parameterized by using generics.");
+			}
+
+			typeHierarchy.add(curT);
+
+			// create the type information for the subtypes
+			TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+			// type needs to be treated a pojo due to additional fields
+			if (subTypesInfo == null) {
 				if (t instanceof ParameterizedType) {
-					return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
+					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
 				}
 				else {
-					return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
+					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
 				}
 			}
-			
-			return new TupleTypeInfo(tAsClass, tupleSubTypes);
-			
+			// return either info
+			return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
 		}
 		// type depends on another type
 		// e.g. class MyMapper<E> extends MapFunction<String, E>
@@ -675,6 +663,71 @@ public class TypeExtractor {
 		}
 		return info;
 	}
+
+	/**
+	 * Creates the TypeInformation for all elements of a type that expects a certain number of
+	 * subtypes (e.g. TupleXX or Either).
+	 *
+	 * @param originalType most concrete subclass
+	 * @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
+	 * @param typeHierarchy necessary for type inference
+	 * @param in1Type necessary for type inference
+	 * @param in2Type necessary for type inference
+	 * @return array containing TypeInformation of sub types or null if definingType contains
+	 *     more subtypes (fields) that defined
+	 */
+	private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
+			ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+		Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
+
+		// materialize possible type variables
+		for (int i = 0; i < subtypes.length; i++) {
+			// materialize immediate TypeVariables
+			if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
+				subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]);
+			}
+			// class or parameterized type
+			else {
+				subtypes[i] = definingType.getActualTypeArguments()[i];
+			}
+		}
+
+		TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
+		for (int i = 0; i < subtypes.length; i++) {
+			ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+			subTypeHierarchy.add(subtypes[i]);
+			// sub type could not be determined with materializing
+			// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
+			if (subtypes[i] instanceof TypeVariable<?>) {
+				subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
+
+				// variable could not be determined
+				if (subTypesInfo[i] == null) {
+					throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+							+ ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+							+ "' could not be determined. This is most likely a type erasure problem. "
+							+ "The type extraction currently supports types with generic variables only in cases where "
+							+ "all variables in the return type can be deduced from the input type(s).");
+				}
+			} else {
+				subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
+			}
+		}
+
+		Class<?> originalTypeAsClass = null;
+		if (isClassType(originalType)) {
+			originalTypeAsClass = typeToClass(originalType);
+		}
+		Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
+		// check if the class we assumed to conform to the defining type so far is actually a pojo because the
+		// original type contains additional fields.
+		// check for additional fields.
+		int fieldCount = countFieldsInClass(originalTypeAsClass);
+		if(fieldCount > subTypesInfo.length) {
+			return null;
+		}
+		return subTypesInfo;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Extract type parameters
@@ -830,9 +883,32 @@ public class TypeExtractor {
 				}
 				
 				for (int i = 0; i < subTypes.length; i++) {
-					validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], ((TupleTypeInfo<?>) typeInfo).getTypeAt(i));
+					validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
 				}
 			}
+			// check for Either
+			else if (typeInfo instanceof EitherTypeInfo) {
+				// check if Either at all
+				if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) {
+					throw new InvalidTypesException("Either type expected.");
+				}
+
+				// go up the hierarchy until we reach Either (with or without generics)
+				while (!(isClassType(type) && typeToClass(type).equals(Either.class))) {
+					typeHierarchy.add(type);
+					type = typeToClass(type).getGenericSuperclass();
+				}
+
+				// check if Either has generics
+				if (type instanceof Class<?>) {
+					throw new InvalidTypesException("Parameterized Either type expected.");
+				}
+
+				EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo;
+				Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
+				validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
+				validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
+			}
 			// check for Writable
 			else if (typeInfo instanceof WritableTypeInfo<?>) {
 				// check if writable at all
@@ -1224,7 +1300,7 @@ public class TypeExtractor {
 		if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) {
 			return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz);
 		}
-		
+
 		// check for basic types
 		TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
 		if (basicTypeInfo != null) {
@@ -1245,6 +1321,11 @@ public class TypeExtractor {
 			throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
 		}
 
+		// check for subclasses of Either
+		if (Either.class.isAssignableFrom(clazz)) {
+			throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class.");
+		}
+
 		// check for Enums
 		if(Enum.class.isAssignableFrom(clazz)) {
 			return new EnumTypeInfo(clazz);
@@ -1558,7 +1639,20 @@ public class TypeExtractor {
 				infos[i] = privateGetForObject(field);
 			}
 			return new TupleTypeInfo(value.getClass(), infos);
-		} else {
+		}
+		// we can not extract the types from an Either object since it only contains type information
+		// of one type, but from Either classes
+		else if (value instanceof Either) {
+			try {
+				return (TypeInformation<X>) privateCreateTypeInfo(value.getClass());
+			}
+			catch (InvalidTypesException e) {
+				throw new InvalidTypesException("Automatic type extraction is not possible on an Either type "
+						+ "as it does not contain information about both possible types. "
+						+ "Please specify the types directly.");
+			}
+		}
+		else {
 			return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index eae767d..7abfc76 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -46,6 +46,8 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.EnumTypeInfo;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
@@ -67,8 +69,6 @@ import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.xml.bind.TypeConstraintException;
-
 
 public class TypeExtractorTest {
 
@@ -1829,4 +1829,81 @@ public class TypeExtractorTest {
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, BasicTypeInfo.INT_TYPE_INFO);
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
 	}
+
+	public static class Either1<T> extends Either<String, T> {
+		@Override
+		public String left() throws IllegalStateException {
+			return null;
+		}
+
+		@Override
+		public T right() throws IllegalStateException {
+			return null;
+		}
+	}
+
+	public static class Either2 extends Either1<Tuple1<Integer>> {
+		// nothing to do here
+	}
+
+	public static class EitherMapper<T> implements MapFunction<T, Either1<T>> {
+		@Override
+		public Either1<T> map(T value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class EitherMapper2 implements MapFunction<String, Either2> {
+		@Override
+		public Either2 map(String value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class EitherMapper3 implements MapFunction<Either2, Either2> {
+		@Override
+		public Either2 map(Either2 value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testEither() {
+		MapFunction<?, ?> function = new MapFunction<Either<String, Boolean>, Either<String, Boolean>>() {
+			@Override
+			public Either<String, Boolean> map(Either<String, Boolean> value) throws Exception {
+				return null;
+			}
+		};
+		TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
+		Assert.assertEquals(expected, ti);
+	}
+
+	@Test
+	public void testEitherHierarchy() {
+		MapFunction<?, ?> function = new EitherMapper<Boolean>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+		TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+		Assert.assertEquals(expected, ti);
+
+		function = new EitherMapper2();
+		ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.STRING_TYPE_INFO);
+		expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO));
+		Assert.assertEquals(expected, ti);
+
+		function = new EitherMapper3();
+		ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
+		Assert.assertEquals(expected, ti);
+
+		Either<String, Tuple1<Integer>> either = new Either2();
+		ti = TypeExtractor.getForObject(either);
+		Assert.assertEquals(expected, ti);
+	}
+
+	@Test(expected=InvalidTypesException.class)
+	public void testEitherFromObjectException() {
+		Either<String, Tuple1<Integer>> either = Either.left("test");
+		TypeExtractor.getForObject(either);
+	}
 }