You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:42:27 UTC

[4/7] incubator-flink git commit: Fix invalid type hierarchy creation by Pojo logic

Fix invalid type hierarchy creation by Pojo logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1ac9651a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1ac9651a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1ac9651a

Branch: refs/heads/release-0.8
Commit: 1ac9651abff2760a2a66c6cce0e3aa2e1bf5d1dd
Parents: 02bad15
Author: twalthr <in...@twalthr.com>
Authored: Wed Dec 10 22:02:07 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:09 2014 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 87 ++++++++++----------
 1 file changed, 43 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1ac9651a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 67b2a51..3bceac5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -232,29 +232,6 @@ public class TypeExtractor {
 		// get info from hierarchy
 		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
-
-
-	/**
-	 * @param curT : start type
-	 * @return Type The immediate child of the top class
-	 */
-	private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
-		while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).equals(
-				stopAtClass))
-				&& !(curT instanceof Class<?> && ((Class<?>) curT).equals(stopAtClass))) {
-			typeHierarchy.add(curT);
-			
-			// parameterized type
-			if (curT instanceof ParameterizedType) {
-				curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass();
-			}
-			// class
-			else {
-				curT = ((Class<?>) curT).getGenericSuperclass();
-			}
-		}
-		return curT;
-	}
 	
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
@@ -330,7 +307,7 @@ public class TypeExtractor {
 			int fieldCount = countFieldsInClass(tAsClass);
 			if(fieldCount != tupleSubTypes.length) {
 				// the class is not a real tuple because it contains additional fields. treat as a pojo
-				return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
+				return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
 			}
 			
 			return new TupleTypeInfo(tAsClass, tupleSubTypes);
@@ -396,23 +373,11 @@ public class TypeExtractor {
 		}
 		// no tuple, no TypeVariable, no generic type
 		else if (t instanceof Class) {
-			return privateGetForClass((Class<OUT>) t, new ArrayList<Type>());
+			return privateGetForClass((Class<OUT>) t, typeHierarchy);
 		}
 		
 		throw new InvalidTypesException("Type Information could not be created.");
 	}
-	
-	private int countFieldsInClass(Class<?> clazz) {
-		int fieldCount = 0;
-		for(Field field : clazz.getFields()) { // get all fields
-			if(	!Modifier.isStatic(field.getModifiers()) &&
-				!Modifier.isTransient(field.getModifiers())
-				) {
-				fieldCount++;
-			}
-		}
-		return fieldCount;
-	}
 
 	private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy, 
 			TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
@@ -427,6 +392,11 @@ public class TypeExtractor {
 			returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
 		}
 		
+		// no input information exists
+		if (in1TypeInfo == null && in2TypeInfo == null) {
+			return null;
+		}
+		
 		// create a new type hierarchy for the input
 		ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
 		// copy the function part of the type hierarchy
@@ -753,6 +723,34 @@ public class TypeExtractor {
 	//  Utility methods
 	// --------------------------------------------------------------------------------------------
 	
+	/**
+	 * @param curT : start type
+	 * @return Type The immediate child of the top class
+	 */
+	private Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
+		// skip first one
+		if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
+			curT = typeToClass(curT).getGenericSuperclass();
+		}
+		while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
+			typeHierarchy.add(curT);
+			curT = typeToClass(curT).getGenericSuperclass();
+		}
+		return curT;
+	}
+	
+	private int countFieldsInClass(Class<?> clazz) {
+		int fieldCount = 0;
+		for(Field field : clazz.getFields()) { // get all fields
+			if(	!Modifier.isStatic(field.getModifiers()) &&
+				!Modifier.isTransient(field.getModifiers())
+				) {
+				fieldCount++;
+			}
+		}
+		return fieldCount;
+	}
+	
 	private static Type removeGenericWrapper(Type t) {
 		if(t instanceof ParameterizedType 	&& 
 				(Collector.class.isAssignableFrom(typeToClass(t))
@@ -954,7 +952,7 @@ public class TypeExtractor {
 			return new GenericTypeInfo<X>(clazz);
 		}
 		try {
-			TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint);
+			TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
 			if (pojoType != null) {
 				return pojoType;
 			}
@@ -1032,12 +1030,12 @@ public class TypeExtractor {
 	}
 
 	private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
-		// try to create Type hierarchy, if the incoming one is empty.
-		if(typeHierarchy.size() == 0) {
-			recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class);
+		// try to create Type hierarchy, if the incoming only contains the most bottom one or none.
+		if(typeHierarchy.size() <= 1) {
+			getTypeHierarchy(typeHierarchy, clazz, Object.class);
 		}
 		if(clazzTypeHint != null) {
-			recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
+			getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
 		}
 		
 		List<Field> fields = getAllDeclaredFields(clazz);
@@ -1049,8 +1047,9 @@ public class TypeExtractor {
 				return null;
 			}
 			try {
-				typeHierarchy.add(fieldType);
-				pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) ));
+				ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+				fieldTypeHierarchy.add(fieldType);
+				pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
 			} catch (InvalidTypesException e) {
 				//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
 				throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""