You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:42:27 UTC
[4/7] incubator-flink git commit: Fix invalid type hierarchy creation
by Pojo logic
Fix invalid type hierarchy creation by Pojo logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1ac9651a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1ac9651a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1ac9651a
Branch: refs/heads/release-0.8
Commit: 1ac9651abff2760a2a66c6cce0e3aa2e1bf5d1dd
Parents: 02bad15
Author: twalthr <in...@twalthr.com>
Authored: Wed Dec 10 22:02:07 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 21:41:09 2014 +0100
----------------------------------------------------------------------
.../flink/api/java/typeutils/TypeExtractor.java | 87 ++++++++++----------
1 file changed, 43 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1ac9651a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 67b2a51..3bceac5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -232,29 +232,6 @@ public class TypeExtractor {
// get info from hierarchy
return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
-
-
- /**
- * @param curT : start type
- * @return Type The immediate child of the top class
- */
- private Type recursivelyGetTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
- while (!(curT instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) curT).getRawType()).equals(
- stopAtClass))
- && !(curT instanceof Class<?> && ((Class<?>) curT).equals(stopAtClass))) {
- typeHierarchy.add(curT);
-
- // parameterized type
- if (curT instanceof ParameterizedType) {
- curT = ((Class<?>) ((ParameterizedType) curT).getRawType()).getGenericSuperclass();
- }
- // class
- else {
- curT = ((Class<?>) curT).getGenericSuperclass();
- }
- }
- return curT;
- }
@SuppressWarnings({ "unchecked", "rawtypes" })
private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
@@ -330,7 +307,7 @@ public class TypeExtractor {
int fieldCount = countFieldsInClass(tAsClass);
if(fieldCount != tupleSubTypes.length) {
// the class is not a real tuple because it contains additional fields. treat as a pojo
- return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
+ return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null); // the typeHierarchy here should be sufficient, even though it stops at the Tuple.class.
}
return new TupleTypeInfo(tAsClass, tupleSubTypes);
@@ -396,23 +373,11 @@ public class TypeExtractor {
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
- return privateGetForClass((Class<OUT>) t, new ArrayList<Type>());
+ return privateGetForClass((Class<OUT>) t, typeHierarchy);
}
throw new InvalidTypesException("Type Information could not be created.");
}
-
- private int countFieldsInClass(Class<?> clazz) {
- int fieldCount = 0;
- for(Field field : clazz.getFields()) { // get all fields
- if( !Modifier.isStatic(field.getModifiers()) &&
- !Modifier.isTransient(field.getModifiers())
- ) {
- fieldCount++;
- }
- }
- return fieldCount;
- }
private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
@@ -427,6 +392,11 @@ public class TypeExtractor {
returnTypeVar = (TypeVariable<?>) matReturnTypeVar;
}
+ // no input information exists
+ if (in1TypeInfo == null && in2TypeInfo == null) {
+ return null;
+ }
+
// create a new type hierarchy for the input
ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
// copy the function part of the type hierarchy
@@ -753,6 +723,34 @@ public class TypeExtractor {
// Utility methods
// --------------------------------------------------------------------------------------------
+ /**
+ * @param curT : start type
+ * @return Type The immediate child of the top class
+ */
+ private Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type curT, Class<?> stopAtClass) {
+ // skip first one
+ if (typeHierarchy.size() > 0 && typeHierarchy.get(0) == curT && isClassType(curT)) {
+ curT = typeToClass(curT).getGenericSuperclass();
+ }
+ while (!(isClassType(curT) && typeToClass(curT).equals(stopAtClass))) {
+ typeHierarchy.add(curT);
+ curT = typeToClass(curT).getGenericSuperclass();
+ }
+ return curT;
+ }
+
+ private int countFieldsInClass(Class<?> clazz) {
+ int fieldCount = 0;
+ for(Field field : clazz.getFields()) { // get all fields
+ if( !Modifier.isStatic(field.getModifiers()) &&
+ !Modifier.isTransient(field.getModifiers())
+ ) {
+ fieldCount++;
+ }
+ }
+ return fieldCount;
+ }
+
private static Type removeGenericWrapper(Type t) {
if(t instanceof ParameterizedType &&
(Collector.class.isAssignableFrom(typeToClass(t))
@@ -954,7 +952,7 @@ public class TypeExtractor {
return new GenericTypeInfo<X>(clazz);
}
try {
- TypeInformation<X> pojoType = analyzePojo(clazz, typeHierarchy, clazzTypeHint);
+ TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
if (pojoType != null) {
return pojoType;
}
@@ -1032,12 +1030,12 @@ public class TypeExtractor {
}
private <X> TypeInformation<X> analyzePojo(Class<X> clazz, ArrayList<Type> typeHierarchy, ParameterizedType clazzTypeHint) {
- // try to create Type hierarchy, if the incoming one is empty.
- if(typeHierarchy.size() == 0) {
- recursivelyGetTypeHierarchy(typeHierarchy, clazz, Object.class);
+ // try to create Type hierarchy, if the incoming only contains the most bottom one or none.
+ if(typeHierarchy.size() <= 1) {
+ getTypeHierarchy(typeHierarchy, clazz, Object.class);
}
if(clazzTypeHint != null) {
- recursivelyGetTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
+ getTypeHierarchy(typeHierarchy, clazzTypeHint, Object.class);
}
List<Field> fields = getAllDeclaredFields(clazz);
@@ -1049,8 +1047,9 @@ public class TypeExtractor {
return null;
}
try {
- typeHierarchy.add(fieldType);
- pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(typeHierarchy, fieldType, null, null) ));
+ ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+ fieldTypeHierarchy.add(fieldType);
+ pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
} catch (InvalidTypesException e) {
//pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""