You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 16:48:27 UTC
flink git commit: [FLINK-2447] [java api] TypeExtractor returns wrong
type info when a Tuple has two fields of the same POJO type
Repository: flink
Updated Branches:
refs/heads/release-0.9 80d3478c0 -> ec3b98327
[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec3b9832
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec3b9832
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec3b9832
Branch: refs/heads/release-0.9
Commit: ec3b983276e85e0baed98ffa8acf6709f20032de
Parents: 80d3478
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 4 15:30:28 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 16:25:32 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/typeutils/TypeExtractor.java | 53 ++++++++++++----
.../type/extractor/PojoTypeExtractionTest.java | 63 ++++++++++++++++++++
2 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 41644f9..1ae8d3d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -66,15 +64,33 @@ import com.google.common.base.Preconditions;
* functions.
*/
public class TypeExtractor {
+
+ /*
+ * NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy".
+ * The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate
+ * types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type
+ * (depends on the method, e.g. MyPojoFieldType).
+ *
+ * Thus, it fully qualifies types until tuple/POJO field level.
+ *
+ * A typical typeHierarchy could look like:
+ *
+ * UDF: MyMapFunction.class
+ * top-level UDF: MyMapFunctionBase.class
+ * RichMapFunction: RichMapFunction.class
+ * MapFunction: MapFunction.class
+ * Function's OUT: Tuple1<MyPojo>
+ * user-defined POJO: MyPojo.class
+ * user-defined top-level POJO: MyPojoBase.class
+ * POJO field: Tuple1<String>
+ * Field type: String.class
+ *
+ */
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
- // We need this to detect recursive types and not get caught
- // in an endless recursion
- private Set<Class<?>> alreadySeen;
-
protected TypeExtractor() {
- alreadySeen = new HashSet<Class<?>>();
+ // only create instances for special use cases
}
// --------------------------------------------------------------------------------------------
@@ -416,10 +432,12 @@ public class TypeExtractor {
TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
+ ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+ subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
if (subtypes[i] instanceof TypeVariable<?>) {
- tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, in2Type);
+ tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
if (tupleSubTypes[i] == null) {
@@ -430,7 +448,7 @@ public class TypeExtractor {
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
- tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), subtypes[i], in1Type, in2Type);
+ tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
}
}
@@ -912,6 +930,19 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
// Utility methods
// --------------------------------------------------------------------------------------------
+
+ /**
+ * @return number of items with equal type or same raw type
+ */
+ private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
+ int count = 0;
+ for (Type t : typeHierarchy) {
+ if (t == type || (isClassType(type) && t == typeToClass(type))) {
+ count++;
+ }
+ }
+ return count;
+ }
/**
* @param curT : start type
@@ -1183,12 +1214,10 @@ public class TypeExtractor {
return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
}
- if (alreadySeen.contains(clazz)) {
+ if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<OUT>(clazz);
}
- alreadySeen.add(clazz);
-
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<OUT>(clazz);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 1f3f71c..34fde20 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -810,4 +810,67 @@ public class PojoTypeExtractionTest {
+ ">"));
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
+
+ public static class RecursivePojo1 {
+ public RecursivePojo1 field;
+ }
+
+ public static class RecursivePojo2 {
+ public Tuple1<RecursivePojo2> field;
+ }
+
+ public static class RecursivePojo3 {
+ public NestedPojo field;
+ }
+
+ public static class NestedPojo {
+ public RecursivePojo3 field;
+ }
+
+ @Test
+ public void testRecursivePojo1() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
+ }
+
+ @Test
+ public void testRecursivePojo2() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+ Assert.assertTrue(pf.type instanceof TupleTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
+ }
+
+ @Test
+ public void testRecursivePojo3() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+ Assert.assertTrue(pf.type instanceof PojoTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
+ }
+
+ public static class FooBarPojo {
+ public int foo, bar;
+ public FooBarPojo() {}
+ }
+
+ public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> {
+ @Override
+ public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ public void testDualUseOfPojo() {
+ MapFunction<?, ?> function = new DuplicateMapper();
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class));
+ Assert.assertTrue(ti instanceof TupleTypeInfo);
+ TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+ Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+ Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+ }
}