You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 16:48:27 UTC

flink git commit: [FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

Repository: flink
Updated Branches:
  refs/heads/release-0.9 80d3478c0 -> ec3b98327


[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec3b9832
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec3b9832
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec3b9832

Branch: refs/heads/release-0.9
Commit: ec3b983276e85e0baed98ffa8acf6709f20032de
Parents: 80d3478
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 4 15:30:28 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 16:25:32 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 53 ++++++++++++----
 .../type/extractor/PojoTypeExtractionTest.java  | 63 ++++++++++++++++++++
 2 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 41644f9..1ae8d3d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -66,15 +64,33 @@ import com.google.common.base.Preconditions;
  * functions.
  */
 public class TypeExtractor {
+
+	/*
+	 * NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy".
+	 * The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate
+	 * types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type
+	 * (depends on the method, e.g. MyPojoFieldType).
+	 *
+	 * Thus, it fully qualifies types until tuple/POJO field level.
+	 *
+	 * A typical typeHierarchy could look like:
+	 *
+	 * UDF: MyMapFunction.class
+	 * top-level UDF: MyMapFunctionBase.class
+	 * RichMapFunction: RichMapFunction.class
+	 * MapFunction: MapFunction.class
+	 * Function's OUT: Tuple1<MyPojo>
+	 * user-defined POJO: MyPojo.class
+	 * user-defined top-level POJO: MyPojoBase.class
+	 * POJO field: Tuple1<String>
+	 * Field type: String.class
+	 *
+	 */
 	
 	private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
 
-	// We need this to detect recursive types and not get caught
-	// in an endless recursion
-	private Set<Class<?>> alreadySeen;
-
 	protected TypeExtractor() {
-		alreadySeen = new HashSet<Class<?>>();
+		// only create instances for special use cases
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -416,10 +432,12 @@ public class TypeExtractor {
 			
 			TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
 			for (int i = 0; i < subtypes.length; i++) {
+				ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+				subTypeHierarchy.add(subtypes[i]);
 				// sub type could not be determined with materializing
 				// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
 				if (subtypes[i] instanceof TypeVariable<?>) {
-					tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, in2Type);
+					tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
 					
 					// variable could not be determined
 					if (tupleSubTypes[i] == null) {
@@ -430,7 +448,7 @@ public class TypeExtractor {
 								+ "all variables in the return type can be deduced from the input type(s).");
 					}
 				} else {
-					tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), subtypes[i], in1Type, in2Type);
+					tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
 				}
 			}
 			
@@ -912,6 +930,19 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	//  Utility methods
 	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * @return number of items with equal type or same raw type
+	 */
+	private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
+		int count = 0;
+		for (Type t : typeHierarchy) {
+			if (t == type || (isClassType(type) && t == typeToClass(type))) {
+				count++;
+			}
+		}
+		return count;
+	}
 	
 	/**
 	 * @param curT : start type
@@ -1183,12 +1214,10 @@ public class TypeExtractor {
 			return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
 		}
 
-		if (alreadySeen.contains(clazz)) {
+		if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
 			return new GenericTypeInfo<OUT>(clazz);
 		}
 
-		alreadySeen.add(clazz);
-
 		if (Modifier.isInterface(clazz.getModifiers())) {
 			// Interface has no members and is therefore not handled as POJO
 			return new GenericTypeInfo<OUT>(clazz);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec3b9832/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 1f3f71c..34fde20 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -810,4 +810,67 @@ public class PojoTypeExtractionTest {
 						+ ">"));
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
+
+	public static class RecursivePojo1 {
+		public RecursivePojo1 field;
+	}
+
+	public static class RecursivePojo2 {
+		public Tuple1<RecursivePojo2> field;
+	}
+
+	public static class RecursivePojo3 {
+		public NestedPojo field;
+	}
+
+	public static class NestedPojo {
+		public RecursivePojo3 field;
+	}
+
+	@Test
+	public void testRecursivePojo1() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
+	}
+
+	@Test
+	public void testRecursivePojo2() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+		Assert.assertTrue(pf.type instanceof TupleTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
+	}
+
+	@Test
+	public void testRecursivePojo3() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+		Assert.assertTrue(pf.type instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
+	}
+
+	public static class FooBarPojo {
+		public int foo, bar;
+		public FooBarPojo() {}
+	}
+
+	public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> {
+		@Override
+		public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testDualUseOfPojo() {
+		MapFunction<?, ?> function = new DuplicateMapper();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class));
+		Assert.assertTrue(ti instanceof TupleTypeInfo);
+		TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+		Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+		Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+	}
 }