You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/11 15:52:47 UTC

incubator-flink git commit: Improved input type inference, bug fixing, code simplification

Repository: incubator-flink
Updated Branches:
  refs/heads/master 57ecbccc4 -> 3772d3041


Improved input type inference, bug fixing, code simplification

This closes #176


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3772d304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3772d304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3772d304

Branch: refs/heads/master
Commit: 3772d3041e4914cb8306ce6b73b79fc3a1afed21
Parents: 57ecbcc
Author: twalthr <in...@twalthr.com>
Authored: Mon Nov 3 17:35:58 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 11 14:15:00 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 124 +++++++++++--------
 .../java/type/extractor/TypeExtractorTest.java  |  70 ++++++++++-
 2 files changed, 138 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3772d304/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 7dcaa0f..d52e1b0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -256,32 +256,20 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 		
 		// check if type is a subclass of tuple
-		if ((t instanceof Class<?> && Tuple.class.isAssignableFrom((Class<?>) t))
-			|| (t instanceof ParameterizedType && Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) {
-			
+		if (isClassType(t) && Tuple.class.isAssignableFrom(typeToClass(t))) {
 			Type curT = t;
 			
 			// do not allow usage of Tuple as type
-			if (curT instanceof Class<?> && ((Class<?>) curT).equals(Tuple.class)) {
+			if (typeToClass(t).equals(Tuple.class)) {
 				throw new InvalidTypesException(
 						"Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.");
 			}
 						
 			// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
 			// collect the types while moving up for a later top-down 
-			while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).getSuperclass().equals(
-					Tuple.class))
-					&& !(curT instanceof Class<?> && ((Class<?>) curT).getSuperclass().equals(Tuple.class))) {
+			while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) {
 				typeHierarchy.add(curT);
-				
-				// parameterized type
-				if (curT instanceof ParameterizedType) {
-					curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass();
-				}
-				// class
-				else {
-					curT = ((Class<?>) curT).getGenericSuperclass();
-				}
+				curT = typeToClass(curT).getGenericSuperclass();
 			}
 			
 			// check if immediate child of Tuple has generics
@@ -289,6 +277,8 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Tuple needs to be parameterized by using generics.");
 			}
 			
+			typeHierarchy.add(curT);
+			
 			ParameterizedType tupleChild = (ParameterizedType) curT;
 			
 			Type[] subtypes = new Type[tupleChild.getActualTypeArguments().length];
@@ -326,10 +316,8 @@ public class TypeExtractor {
 			}
 			
 			Class<?> tAsClass = null;
-			if (t instanceof Class<?>) {
-				tAsClass = (Class<?>) t;
-			} else if (t instanceof ParameterizedType) {
-				tAsClass = (Class<? extends Tuple>) ((ParameterizedType) t).getRawType();
+			if (isClassType(t)) {
+				tAsClass = typeToClass(t);
 			}
 			Preconditions.checkNotNull(tAsClass, "t has a unexpected type");
 			// check if the class we assumed to be a Tuple so far is actually a pojo because it contains additional fields.
@@ -434,21 +422,32 @@ public class TypeExtractor {
 			returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
 		}
 		
+		// create a new type hierarchy for the input
+		ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
+		// copy the function part of the type hierarchy
+		for (Type t : returnTypeHierarchy) {
+			if (isClassType(t) && Function.class.isAssignableFrom(typeToClass(t)) && typeToClass(t) != Function.class) {
+				inputTypeHierarchy.add(t);
+			}
+			else {
+				break;
+			}
+		}
+		ParameterizedType baseClass = (ParameterizedType) inputTypeHierarchy.get(inputTypeHierarchy.size() - 1);
+		
 		TypeInformation<?> info = null;
 		if (in1TypeInfo != null) {
 			// find the deepest type variable that describes the type of input 1
-			ParameterizedType baseClass = (ParameterizedType) returnTypeHierarchy.get(returnTypeHierarchy.size() - 1);
 			Type in1Type = baseClass.getActualTypeArguments()[0];
 
-			info = createTypeInfoFromInput(returnTypeVar, returnTypeHierarchy, in1Type, in1TypeInfo);
+			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in1Type, in1TypeInfo);
 		}
 
 		if (info == null && in2TypeInfo != null) {
 			// find the deepest type variable that describes the type of input 2
-			ParameterizedType baseClass = (ParameterizedType) returnTypeHierarchy.get(returnTypeHierarchy.size() - 1);
 			Type in2Type = baseClass.getActualTypeArguments()[1];
 
-			info = createTypeInfoFromInput(returnTypeVar, returnTypeHierarchy, in2Type, in2TypeInfo);
+			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in2Type, in2TypeInfo);
 		}
 
 		if (info != null) {
@@ -458,20 +457,33 @@ public class TypeExtractor {
 		return null;
 	}
 	
-	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, 
-			Type inType, TypeInformation<IN1> inTypeInfo) {
+	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
 		TypeInformation<?> info = null;
+		
 		// the input is a type variable
 		if (inType instanceof TypeVariable) {
-			inType = materializeTypeVariable(returnTypeHierarchy, (TypeVariable<?>) inType);
+			inType = materializeTypeVariable(inputTypeHierarchy, (TypeVariable<?>) inType);
 			info = findCorrespondingInfo(returnTypeVar, inType, inTypeInfo);
 		}
 		// the input is a tuple that may contains type variables
-		else if (inType instanceof ParameterizedType && Tuple.class.isAssignableFrom(((Class<?>)((ParameterizedType) inType).getRawType()))) {
-			Type[] tupleElements = ((ParameterizedType) inType).getActualTypeArguments();
+		else if (isClassType(inType) && Tuple.class.isAssignableFrom(typeToClass(inType))) {
+			ParameterizedType tupleBaseClass = null;
+			
+			// get tuple from possible tuple subclass
+			while (!(isClassType(inType) && typeToClass(inType).getSuperclass().equals(Tuple.class))) {
+				inputTypeHierarchy.add(inType);
+				inType = typeToClass(inType).getGenericSuperclass();
+			}
+			inputTypeHierarchy.add(inType);
+			
+			// we can assume to be parameterized since we
+			// already did input validation
+			tupleBaseClass = (ParameterizedType) inType;
+			
+			Type[] tupleElements = tupleBaseClass.getActualTypeArguments();
 			// go thru all tuple elements and search for type variables
-			for(int i = 0; i < tupleElements.length; i++) {
-				info = createTypeInfoFromInput(returnTypeVar, returnTypeHierarchy, tupleElements[i], ((TupleTypeInfo<?>) inTypeInfo).getTypeAt(i));
+			for (int i = 0; i < tupleElements.length; i++) {
+				info = createTypeInfoFromInput(returnTypeVar, inputTypeHierarchy, tupleElements[i], ((TupleTypeInfo<?>) inTypeInfo).getTypeAt(i));
 				if(info != null) {
 					break;
 				}
@@ -489,6 +501,9 @@ public class TypeExtractor {
 	}
 	
 	private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) {
+		if (typeHierarchy != null) {
+			typeHierarchy.add(clazz);
+		}
 		Type[] interfaceTypes = clazz.getGenericInterfaces();
 		
 		// search in interfaces for base class
@@ -516,7 +531,7 @@ public class TypeExtractor {
 			if (typeHierarchy != null) {
 				typeHierarchy.add(t);
 			}
-			ParameterizedType baseClassChild = (ParameterizedType) t;				
+			ParameterizedType baseClassChild = (ParameterizedType) t;
 			return baseClassChild.getActualTypeArguments()[pos];
 		}
 		// interface that extended base class as class or parameterized type
@@ -588,30 +603,19 @@ public class TypeExtractor {
 			// check for tuple
 			else if (typeInfo.isTupleType()) {
 				// check if tuple at all
-				if (!(type instanceof Class<?> && Tuple.class.isAssignableFrom((Class<?>) type))
-						&& !(type instanceof ParameterizedType && Tuple.class.isAssignableFrom((Class<?>) ((ParameterizedType) type)
-								.getRawType()))) {
+				if (!(isClassType(type) && Tuple.class.isAssignableFrom(typeToClass(type)))) {
 					throw new InvalidTypesException("Tuple type expected.");
 				}
 				
 				// do not allow usage of Tuple as type
-				if (type instanceof Class<?> && ((Class<?>) type).equals(Tuple.class)) {
+				if (isClassType(type) && typeToClass(type).equals(Tuple.class)) {
 					throw new InvalidTypesException("Concrete subclass of Tuple expected.");
 				}
 				
 				// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
-				while (!(type instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) type).getRawType()).getSuperclass().equals(
-						Tuple.class))
-						&& !(type instanceof Class<?> && ((Class<?>) type).getSuperclass().equals(Tuple.class))) {
+				while (!(isClassType(type) && typeToClass(type).getSuperclass().equals(Tuple.class))) {
 					typeHierarchy.add(type);
-					// parameterized type
-					if (type instanceof ParameterizedType) {
-						type = ((Class<?>) ((ParameterizedType) type).getRawType()).getGenericSuperclass();
-					}
-					// class
-					else {
-						type = ((Class<?>) type).getGenericSuperclass();
-					}
+					type = typeToClass(type).getGenericSuperclass();
 				}
 				
 				// check if immediate child of Tuple has generics
@@ -706,9 +710,7 @@ public class TypeExtractor {
 			// check for POJO
 			else if (typeInfo instanceof PojoTypeInfo) {
 				Class<?> clazz = null;
-				if (!(type instanceof Class<?> && ((PojoTypeInfo<?>) typeInfo).getTypeClass() == (clazz = (Class<?>) type))
-						&& !(type instanceof ParameterizedType && (clazz = (Class<?>) ((ParameterizedType) type).getRawType()) == ((PojoTypeInfo<?>) typeInfo)
-								.getTypeClass())) {
+				if (!(isClassType(type) && ((PojoTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) {
 					throw new InvalidTypesException("POJO type '"
 							+ ((PojoTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
 							+ clazz.getCanonicalName() + "'.");
@@ -717,9 +719,7 @@ public class TypeExtractor {
 			// check for generic object
 			else if (typeInfo instanceof GenericTypeInfo<?>) {
 				Class<?> clazz = null;
-				if (!(type instanceof Class<?> && ((GenericTypeInfo<?>) typeInfo).getTypeClass() == (clazz = (Class<?>) type))
-						&& !(type instanceof ParameterizedType && (clazz = (Class<?>) ((ParameterizedType) type).getRawType()) == ((GenericTypeInfo<?>) typeInfo)
-								.getTypeClass())) {
+				if (!(isClassType(type) && ((GenericTypeInfo<?>) typeInfo).getTypeClass() == (clazz = typeToClass(type)))) {
 					throw new InvalidTypesException("Generic object type '"
 							+ ((GenericTypeInfo<?>) typeInfo).getTypeClass().getCanonicalName() + "' expected but was '"
 							+ clazz.getCanonicalName() + "'.");
@@ -739,8 +739,8 @@ public class TypeExtractor {
 	
 	private static Type removeGenericWrapper(Type t) {
 		if(t instanceof ParameterizedType 	&& 
-				(Collector.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType())
-						|| Iterable.class.isAssignableFrom((Class<?>) ((ParameterizedType) t).getRawType()))) {
+				(Collector.class.isAssignableFrom(typeToClass(t))
+						|| Iterable.class.isAssignableFrom(typeToClass(t)))) {
 			return ((ParameterizedType) t).getActualTypeArguments()[0];
 		}
 		return t;
@@ -1113,6 +1113,20 @@ public class TypeExtractor {
 		}
 		return result;
 	}
+	
+	private static Class<?> typeToClass(Type t) {
+		if (t instanceof Class) {
+			return (Class<?>)t;
+		}
+		else if (t instanceof ParameterizedType) {
+			return ((Class<?>)((ParameterizedType) t).getRawType());
+		}
+		throw new IllegalArgumentException("Cannot convert type to class");
+	}
+	
+	private static boolean isClassType(Type t) {
+		return t instanceof Class<?> || t instanceof ParameterizedType;
+	}
 
 
 	public static <X> TypeInformation<X> getForObject(X value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3772d304/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index 695a42e..00c0194 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -60,7 +60,6 @@ import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 
@@ -1534,4 +1533,73 @@ public class TypeExtractorTest {
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0));
 		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1));
 	}
+	
+	public static class Edge<K, V> extends Tuple3<K, K, V> {
+		private static final long serialVersionUID = 1L;
+		
+	}
+	
+	public static class EdgeMapper<K, V> implements MapFunction<Edge<K, V>, Edge<K, V>> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public Edge<K, V> map(Edge<K, V> value) throws Exception {
+			return null;
+		}
+		
+	}
+	
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testInputInference1() {
+		EdgeMapper<String, Double> em = new EdgeMapper<String, Double>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<String, String, Double>"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(3, ti.getArity());
+		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0));
+		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1));
+		Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2));
+	}
+	
+	public static class EdgeMapper2<V> implements MapFunction<V, Edge<Long, V>> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public Edge<Long, V> map(V value) throws Exception {
+			return null;
+		}
+		
+	}
+	
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testInputInference2() {
+		EdgeMapper2<Boolean> em = new EdgeMapper2<Boolean>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Boolean"));
+		Assert.assertTrue(ti.isTupleType());
+		Assert.assertEquals(3, ti.getArity());
+		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(0));
+		Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1));
+		Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(2));
+	}
+	
+	public static class EdgeMapper3<K, V> implements MapFunction<Edge<K, V>, V> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public V map(Edge<K, V> value) throws Exception {
+			return null;
+		}
+	}
+	
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testInputInference3() {
+		EdgeMapper3<Boolean, String> em = new EdgeMapper3<Boolean, String>();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>"));
+		Assert.assertTrue(ti.isBasicType());
+		Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
+	}
 }