You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/30 14:13:10 UTC

[flink] branch master updated (1c09c23 -> a884706)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1c09c23  [FLINK-16048][avro] Support read/write confluent schema registry avro data from Kafka
     new 250a6c1  [hotfix] Remove dead code in TypeExtractor
     new 3309d14  [hotfix] Remove warnings in TypeExtractor, AvroTypeInfo
     new d05960c  [hotfix] Use List instead of ArrayList in TypeExtractor
     new a884706  [FLINK-12175] Change filling of typeHierarchy in analyzePojo, for correctly creating fields TypeInfo

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/api/common/typeinfo/TypeInfoFactory.java |   4 +-
 .../api/java/typeutils/TypeExtractionUtils.java    |   7 +-
 .../flink/api/java/typeutils/TypeExtractor.java    | 255 ++++++++-------------
 .../PojoParametrizedTypeExtractionTest.java        | 100 ++++++++
 .../flink/formats/avro/typeutils/AvroTypeInfo.java |  22 +-
 5 files changed, 213 insertions(+), 175 deletions(-)
 create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java


[flink] 01/04: [hotfix] Remove dead code in TypeExtractor

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 250a6c1ca911535efb5a03433e3f95daf8be3ff7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 29 13:53:54 2020 +0200

    [hotfix] Remove dead code in TypeExtractor
---
 .../flink/api/common/typeinfo/TypeInfoFactory.java |  4 +-
 .../flink/api/java/typeutils/TypeExtractor.java    | 72 ++++------------------
 2 files changed, 13 insertions(+), 63 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
index 898b05e..54f9335 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.lang.reflect.Type;
 import java.util.Map;
@@ -29,8 +28,7 @@ import java.util.Map;
  * plugging-in user-defined {@link TypeInformation} into the Flink type system. The factory is
  * called during the type extraction phase if the corresponding type has been annotated with
  * {@link TypeInfo}. In a hierarchy of types the closest factory will be chosen while traversing
- * upwards, however, a globally registered factory has highest precedence
- * (see {@link TypeExtractor#registerFactory(Type, Class)}).
+ * upwards.
  *
  * @param <T> type for which {@link TypeInformation} is created
  */
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index adcfdbb..b94dd5f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.commons.lang3.ClassUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -53,6 +52,8 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ClassUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,14 +73,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
@@ -132,34 +133,6 @@ public class TypeExtractor {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  TypeInfoFactory registry
-	// --------------------------------------------------------------------------------------------
-
-	private static Map<Type, Class<? extends TypeInfoFactory>> registeredTypeInfoFactories = new HashMap<>();
-
-	/**
-	 * Registers a type information factory globally for a certain type. Every following type extraction
-	 * operation will use the provided factory for this type. The factory will have highest precedence
-	 * for this type. In a hierarchy of types the registered factory has higher precedence than annotations
-	 * at the same level but lower precedence than factories defined down the hierarchy.
-	 *
-	 * @param t type for which a new factory is registered
-	 * @param factory type information factory that will produce {@link TypeInformation}
-	 */
-	private static void registerFactory(Type t, Class<? extends TypeInfoFactory> factory) {
-		Preconditions.checkNotNull(t, "Type parameter must not be null.");
-		Preconditions.checkNotNull(factory, "Factory parameter must not be null.");
-
-		if (!TypeInfoFactory.class.isAssignableFrom(factory)) {
-			throw new IllegalArgumentException("Class is not a TypeInfoFactory.");
-		}
-		if (registeredTypeInfoFactories.containsKey(t)) {
-			throw new InvalidTypesException("A TypeInfoFactory for type '" + t + "' is already registered.");
-		}
-		registeredTypeInfoFactories.put(t, factory);
-	}
-
-	// --------------------------------------------------------------------------------------------
 	//  Function specific methods
 	// --------------------------------------------------------------------------------------------
 
@@ -1498,22 +1471,6 @@ public class TypeExtractor {
 		}
 	}
 
-	private static void validateInputContainsExecutable(LambdaExecutable exec, TypeInformation<?> typeInfo) {
-		List<Method> methods = getAllDeclaredMethods(typeInfo.getTypeClass());
-		for (Method method : methods) {
-			if (exec.executablesEquals(method)) {
-				return;
-			}
-		}
-		Constructor<?>[] constructors = typeInfo.getTypeClass().getDeclaredConstructors();
-		for (Constructor<?> constructor : constructors) {
-			if (exec.executablesEquals(constructor)) {
-				return;
-			}
-		}
-		throw new InvalidTypesException("Type contains no executable '" + exec.getName() + "'.");
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Utility methods
 	// --------------------------------------------------------------------------------------------
@@ -1524,19 +1481,14 @@ public class TypeExtractor {
 	@Internal
 	public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
 		final Class<?> factoryClass;
-		if (registeredTypeInfoFactories.containsKey(t)) {
-			factoryClass = registeredTypeInfoFactories.get(t);
+		if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
+			return null;
 		}
-		else {
-			if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
-				return null;
-			}
-			final TypeInfo typeInfoAnnotation = typeToClass(t).getAnnotation(TypeInfo.class);
-			factoryClass = typeInfoAnnotation.value();
-			// check for valid factory class
-			if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
-				throw new InvalidTypesException("TypeInfo annotation does not specify a valid TypeInfoFactory.");
-			}
+		final TypeInfo typeInfoAnnotation = typeToClass(t).getAnnotation(TypeInfo.class);
+		factoryClass = typeInfoAnnotation.value();
+		// check for valid factory class
+		if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
+			throw new InvalidTypesException("TypeInfo annotation does not specify a valid TypeInfoFactory.");
 		}
 
 		// instantiate


[flink] 03/04: [hotfix] Use List instead of ArrayList in TypeExtractor

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d05960c1ecefe622e297cd41a38d68ada3016f28
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 29 14:21:06 2020 +0200

    [hotfix] Use List instead of ArrayList in TypeExtractor
---
 .../flink/api/java/typeutils/TypeExtractor.java    | 50 +++++++++++-----------
 .../flink/formats/avro/typeutils/AvroTypeInfo.java |  4 +-
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a990f76..318f10d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -749,7 +749,7 @@ public class TypeExtractor {
 	// ----------------------------------- private methods ----------------------------------------
 
 	private TypeInformation<?> privateCreateTypeInfo(Type t) {
-		ArrayList<Type> typeHierarchy = new ArrayList<>();
+		List<Type> typeHierarchy = new ArrayList<>();
 		typeHierarchy.add(t);
 		return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
 	}
@@ -758,7 +758,7 @@ public class TypeExtractor {
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		ArrayList<Type> typeHierarchy = new ArrayList<>();
+		List<Type> typeHierarchy = new ArrayList<>();
 		Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
 
 		TypeInformation<OUT> typeInfo;
@@ -778,14 +778,14 @@ public class TypeExtractor {
 
 	// for LambdaFunctions
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		ArrayList<Type> typeHierarchy = new ArrayList<>();
+		List<Type> typeHierarchy = new ArrayList<>();
 
 		// get info from hierarchy
 		return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(ArrayList<Type> typeHierarchy, Type t,
+	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoWithTypeHierarchy(List<Type> typeHierarchy, Type t,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 
 		// check if type information can be created using a type factory
@@ -902,7 +902,7 @@ public class TypeExtractor {
 		throw new InvalidTypesException("Type Information could not be created.");
 	}
 
-	private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, ArrayList<Type> returnTypeHierarchy,
+	private <IN1, IN2> TypeInformation<?> createTypeInfoFromInputs(TypeVariable<?> returnTypeVar, List<Type> returnTypeHierarchy,
 			TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo) {
 
 		Type matReturnTypeVar = materializeTypeVariable(returnTypeHierarchy, returnTypeVar);
@@ -921,7 +921,7 @@ public class TypeExtractor {
 		}
 
 		// create a new type hierarchy for the input
-		ArrayList<Type> inputTypeHierarchy = new ArrayList<>();
+		List<Type> inputTypeHierarchy = new ArrayList<>();
 		// copy the function part of the type hierarchy
 		for (Type t : returnTypeHierarchy) {
 			Class<?> clazz = typeToClass(t);
@@ -965,11 +965,11 @@ public class TypeExtractor {
 	 * Return the type information for "returnTypeVar" given that "inType" has type information "inTypeInfo".
 	 * Thus "inType" must contain "returnTypeVar" in a "inputTypeHierarchy", otherwise null is returned.
 	 */
-	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
+	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, List<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
 		TypeInformation<?> info = null;
 
 		// use a factory to find corresponding type information to type variable
-		final ArrayList<Type> factoryHierarchy = new ArrayList<>(inputTypeHierarchy);
+		final List<Type> factoryHierarchy = new ArrayList<>(inputTypeHierarchy);
 		final TypeInfoFactory<?> factory = getClosestFactory(factoryHierarchy, inType);
 		if (factory != null) {
 			// the type that defines the factory is last in factory hierarchy
@@ -1055,7 +1055,7 @@ public class TypeExtractor {
 					return getTypeOfPojoField(inTypeInfo, field);
 				}
 				else if (fieldType instanceof ParameterizedType || fieldType instanceof GenericArrayType) {
-					ArrayList<Type> typeHierarchyWithFieldType = new ArrayList<>(inputTypeHierarchy);
+					List<Type> typeHierarchyWithFieldType = new ArrayList<>(inputTypeHierarchy);
 					typeHierarchyWithFieldType.add(fieldType);
 					TypeInformation<?> foundInfo = createTypeInfoFromInput(returnTypeVar, typeHierarchyWithFieldType, fieldType, getTypeOfPojoField(inTypeInfo, field));
 					if (foundInfo != null) {
@@ -1081,7 +1081,7 @@ public class TypeExtractor {
 	 *     more subtypes (fields) that defined
 	 */
 	private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
-			ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, boolean lenient) {
+			List<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, boolean lenient) {
 		Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
 
 		// materialize possible type variables
@@ -1099,7 +1099,7 @@ public class TypeExtractor {
 
 		TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
 		for (int i = 0; i < subtypes.length; i++) {
-			final ArrayList<Type> subTypeHierarchy = new ArrayList<>(typeHierarchy);
+			final List<Type> subTypeHierarchy = new ArrayList<>(typeHierarchy);
 			subTypeHierarchy.add(subtypes[i]);
 			// sub type could not be determined with materializing
 			// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
@@ -1153,9 +1153,9 @@ public class TypeExtractor {
 	 */
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfoFromFactory(
-			Type t, ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+			Type t, List<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 
-		final ArrayList<Type> factoryHierarchy = new ArrayList<>(typeHierarchy);
+		final List<Type> factoryHierarchy = new ArrayList<>(typeHierarchy);
 		final TypeInfoFactory<? super OUT> factory = getClosestFactory(factoryHierarchy, t);
 		if (factory == null) {
 			return null;
@@ -1194,7 +1194,7 @@ public class TypeExtractor {
 		return getParameterType(baseClass, null, clazz, pos);
 	}
 
-	private static Type getParameterType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Class<?> clazz, int pos) {
+	private static Type getParameterType(Class<?> baseClass, List<Type> typeHierarchy, Class<?> clazz, int pos) {
 		if (typeHierarchy != null) {
 			typeHierarchy.add(clazz);
 		}
@@ -1219,7 +1219,7 @@ public class TypeExtractor {
 						"Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point");
 	}
 
-	private static Type getParameterTypeFromGenericType(Class<?> baseClass, ArrayList<Type> typeHierarchy, Type t, int pos) {
+	private static Type getParameterTypeFromGenericType(Class<?> baseClass, List<Type> typeHierarchy, Type t, int pos) {
 		// base class
 		if (t instanceof ParameterizedType && baseClass.equals(((ParameterizedType) t).getRawType())) {
 			if (typeHierarchy != null) {
@@ -1249,7 +1249,7 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 
 	private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
-		ArrayList<Type> typeHierarchy = new ArrayList<>();
+		List<Type> typeHierarchy = new ArrayList<>();
 
 		// try to get generic parameter
 		Type inType;
@@ -1269,7 +1269,7 @@ public class TypeExtractor {
 	}
 
 	@SuppressWarnings("unchecked")
-	private static void validateInfo(ArrayList<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) {
+	private static void validateInfo(List<Type> typeHierarchy, Type type, TypeInformation<?> typeInfo) {
 		if (type == null) {
 			throw new InvalidTypesException("Unknown Error. Type is null.");
 		}
@@ -1490,7 +1490,7 @@ public class TypeExtractor {
 	/**
 	 * @return number of items with equal type or same raw type
 	 */
-	private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
+	private static int countTypeInHierarchy(List<Type> typeHierarchy, Type type) {
 		int count = 0;
 		for (Type t : typeHierarchy) {
 			if (t == type || (isClassType(type) && t == typeToClass(type)) || (isClassType(t) && typeToClass(t) == type)) {
@@ -1507,7 +1507,7 @@ public class TypeExtractor {
 	 * @param t type for which a factory needs to be found
 	 * @return closest type information factory or null if there is no factory in the type hierarchy
 	 */
-	private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
+	private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(List<Type> typeHierarchy, Type t) {
 		TypeInfoFactory<OUT> factory = null;
 		while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
 			typeHierarchy.add(t);
@@ -1583,17 +1583,17 @@ public class TypeExtractor {
 	 * @return TypeInformation that describes the passed Class
 	 */
 	public static <X> TypeInformation<X> getForClass(Class<X> clazz) {
-		final ArrayList<Type> typeHierarchy = new ArrayList<>();
+		final List<Type> typeHierarchy = new ArrayList<>();
 		typeHierarchy.add(clazz);
 		return new TypeExtractor().privateGetForClass(clazz, typeHierarchy);
 	}
 
-	private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type> typeHierarchy) {
+	private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, List<Type> typeHierarchy) {
 		return privateGetForClass(clazz, typeHierarchy, null, null, null);
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
+	private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, List<Type> typeHierarchy,
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 		checkNotNull(clazz);
 
@@ -1772,7 +1772,7 @@ public class TypeExtractor {
 	}
 
 	@SuppressWarnings("unchecked")
-	protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
+	protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy,
 			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 
 		if (!Modifier.isPublic(clazz.getModifiers())) {
@@ -1809,7 +1809,7 @@ public class TypeExtractor {
 				return null;
 			}
 			try {
-				ArrayList<Type> fieldTypeHierarchy = new ArrayList<>(typeHierarchy);
+				List<Type> fieldTypeHierarchy = new ArrayList<>(typeHierarchy);
 				fieldTypeHierarchy.add(fieldType);
 				TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type);
 				pojoFields.add(new PojoField(field, ti));
@@ -1937,7 +1937,7 @@ public class TypeExtractor {
 		checkNotNull(value);
 
 		// check if type information can be produced using a factory
-		final ArrayList<Type> typeHierarchy = new ArrayList<>();
+		final List<Type> typeHierarchy = new ArrayList<>();
 		typeHierarchy.add(value.getClass());
 		final TypeInformation<X> typeFromFactory = createTypeInfoFromFactory(value.getClass(), typeHierarchy, null, null);
 		if (typeFromFactory != null) {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index a898fc7..0fcee4f 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -64,7 +64,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 	@Internal
 	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
 			PojoTypeExtractor pte = new PojoTypeExtractor();
-			ArrayList<Type> typeHierarchy = new ArrayList<>();
+			List<Type> typeHierarchy = new ArrayList<>();
 			typeHierarchy.add(typeClass);
 			TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
 
@@ -96,7 +96,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 		}
 
 		@Override
-		public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
+		public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy,
 				ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
 			return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type);
 		}


[flink] 02/04: [hotfix] Remove warnings in TypeExtractor, AvroTypeInfo

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3309d14b1abec8f9303f5158a57eee17c4de92d2
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 29 14:06:52 2020 +0200

    [hotfix] Remove warnings in TypeExtractor, AvroTypeInfo
---
 .../api/java/typeutils/TypeExtractionUtils.java    |   7 +-
 .../flink/api/java/typeutils/TypeExtractor.java    | 118 ++++++++++-----------
 .../flink/formats/avro/typeutils/AvroTypeInfo.java |  10 +-
 3 files changed, 63 insertions(+), 72 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index eef309e..600ea8e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -257,12 +257,13 @@ public class TypeExtractionUtils {
 	/**
 	 * Convert ParameterizedType or Class to a Class.
 	 */
-	public static Class<?> typeToClass(Type t) {
+	@SuppressWarnings("unchecked")
+	public static <T> Class<T> typeToClass(Type t) {
 		if (t instanceof Class) {
-			return (Class<?>)t;
+			return (Class<T>) t;
 		}
 		else if (t instanceof ParameterizedType) {
-			return ((Class<?>) ((ParameterizedType) t).getRawType());
+			return ((Class<T>) ((ParameterizedType) t).getRawType());
 		}
 		throw new IllegalArgumentException("Cannot convert type to class");
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b94dd5f..a990f76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -146,7 +146,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) mapInterface,
+			mapInterface,
 			MapFunction.class,
 			0,
 			1,
@@ -167,7 +167,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) flatMapInterface,
+			flatMapInterface,
 			FlatMapFunction.class,
 			0,
 			1,
@@ -195,7 +195,7 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) foldInterface,
+			foldInterface,
 			FoldFunction.class,
 			0,
 			1,
@@ -251,7 +251,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) mapPartitionInterface,
+			mapPartitionInterface,
 			MapPartitionFunction.class,
 			0,
 			1,
@@ -271,7 +271,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) groupReduceInterface,
+			groupReduceInterface,
 			GroupReduceFunction.class,
 			0,
 			1,
@@ -291,7 +291,7 @@ public class TypeExtractor {
 																			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) combineInterface,
+			combineInterface,
 			GroupCombineFunction.class,
 			0,
 			1,
@@ -313,7 +313,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) joinInterface,
+			joinInterface,
 			FlatJoinFunction.class,
 			0,
 			1,
@@ -337,7 +337,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) joinInterface,
+			joinInterface,
 			JoinFunction.class,
 			0,
 			1,
@@ -361,7 +361,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) coGroupInterface,
+			coGroupInterface,
 			CoGroupFunction.class,
 			0,
 			1,
@@ -385,7 +385,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) crossInterface,
+			crossInterface,
 			CrossFunction.class,
 			0,
 			1,
@@ -407,7 +407,7 @@ public class TypeExtractor {
 			TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) selectorInterface,
+			selectorInterface,
 			KeySelector.class,
 			0,
 			1,
@@ -749,7 +749,7 @@ public class TypeExtractor {
 	// ----------------------------------- private methods ----------------------------------------
 
 	private TypeInformation<?> privateCreateTypeInfo(Type t) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 		typeHierarchy.add(t);
 		return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
 	}
@@ -758,7 +758,7 @@ public class TypeExtractor {
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 		Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
 
 		TypeInformation<OUT> typeInfo;
@@ -773,16 +773,15 @@ public class TypeExtractor {
 		}
 
 		// get info from hierarchy
-		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+		return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
 
 	// for LambdaFunctions
-	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 
 		// get info from hierarchy
-		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+		return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -823,14 +822,20 @@ public class TypeExtractor {
 			typeHierarchy.add(curT);
 
 			// create the type information for the subtypes
-			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
+			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(
+				t,
+				(ParameterizedType) curT,
+				typeHierarchy,
+				in1Type,
+				in2Type,
+				false);
 			// type needs to be treated a pojo due to additional fields
 			if (subTypesInfo == null) {
 				if (t instanceof ParameterizedType) {
-					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
+					return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
 				}
 				else {
-					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
+					return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), null, in1Type, in2Type);
 				}
 			}
 			// return tuple info
@@ -887,7 +892,7 @@ public class TypeExtractor {
 		}
 		// objects with generics are treated as Class first
 		else if (t instanceof ParameterizedType) {
-			return (TypeInformation<OUT>) privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
+			return privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
 		}
 		// no tuple, no TypeVariable, no generic type
 		else if (t instanceof Class) {
@@ -916,10 +921,11 @@ public class TypeExtractor {
 		}
 
 		// create a new type hierarchy for the input
-		ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> inputTypeHierarchy = new ArrayList<>();
 		// copy the function part of the type hierarchy
 		for (Type t : returnTypeHierarchy) {
-			if (isClassType(t) && Function.class.isAssignableFrom(typeToClass(t)) && typeToClass(t) != Function.class) {
+			Class<?> clazz = typeToClass(t);
+			if (isClassType(t) && Function.class.isAssignableFrom(clazz) && clazz != Function.class) {
 				inputTypeHierarchy.add(t);
 			}
 			else {
@@ -938,21 +944,17 @@ public class TypeExtractor {
 			// find the deepest type variable that describes the type of input 1
 			Type in1Type = baseClass.getActualTypeArguments()[0];
 
-			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in1Type, in1TypeInfo);
+			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<>(inputTypeHierarchy), in1Type, in1TypeInfo);
 		}
 
 		if (info == null && in2TypeInfo != null) {
 			// find the deepest type variable that describes the type of input 2
 			Type in2Type = baseClass.getActualTypeArguments()[1];
 
-			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in2Type, in2TypeInfo);
-		}
-
-		if (info != null) {
-			return info;
+			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<>(inputTypeHierarchy), in2Type, in2TypeInfo);
 		}
 
-		return null;
+		return info;
 	}
 
 	/**
@@ -963,7 +965,6 @@ public class TypeExtractor {
 	 * Return the type information for "returnTypeVar" given that "inType" has type information "inTypeInfo".
 	 * Thus "inType" must contain "returnTypeVar" in a "inputTypeHierarchy", otherwise null is returned.
 	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
 	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
 		TypeInformation<?> info = null;
 
@@ -1247,18 +1248,8 @@ public class TypeExtractor {
 	//  Validate input
 	// --------------------------------------------------------------------------------------------
 
-	private static void validateInputType(Type t, TypeInformation<?> inType) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
-		try {
-			validateInfo(typeHierarchy, t, inType);
-		}
-		catch(InvalidTypesException e) {
-			throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
-		}
-	}
-
 	private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 
 		// try to get generic parameter
 		Type inType;
@@ -1353,7 +1344,7 @@ public class TypeExtractor {
 				}
 
 				for (int i = 0; i < subTypes.length; i++) {
-					validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
+					validateInfo(new ArrayList<>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
 				}
 			}
 			// check for primitive array
@@ -1426,7 +1417,7 @@ public class TypeExtractor {
 
 				TypeInformation<?> actual;
 				// check value type contents
-				if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
+				if (!typeInfo.equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
 					throw new InvalidTypesException("Value type '" + typeInfo + "' expected but was '" + actual + "'.");
 				}
 			}
@@ -1479,6 +1470,7 @@ public class TypeExtractor {
 	 * Returns the type information factory for a type using the factory registry or annotations.
 	 */
 	@Internal
+	@SuppressWarnings("unchecked")
 	public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
 		final Class<?> factoryClass;
 		if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
@@ -1516,7 +1508,7 @@ public class TypeExtractor {
 	 * @return closest type information factory or null if there is no factory in the type hierarchy
 	 */
 	private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
-		TypeInfoFactory factory = null;
+		TypeInfoFactory<OUT> factory = null;
 		while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
 			typeHierarchy.add(t);
 			factory = getTypeInfoFactory(t);
@@ -1545,7 +1537,7 @@ public class TypeExtractor {
 	 * Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards.
 	 * If a value could not be found it will return the most bottom type variable in the hierarchy.
 	 */
-	private static Type materializeTypeVariable(ArrayList<Type> typeHierarchy, TypeVariable<?> typeVar) {
+	private static Type materializeTypeVariable(List<Type> typeHierarchy, TypeVariable<?> typeVar) {
 		TypeVariable<?> inTypeTypeVar = typeVar;
 		// iterate thru hierarchy from top to bottom until type variable gets a class assigned
 		for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
@@ -1618,7 +1610,7 @@ public class TypeExtractor {
 
 		// Class is handled as generic type info
 		if (clazz.equals(Class.class)) {
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
 		// recursive types are handled as generic type info
@@ -1696,11 +1688,11 @@ public class TypeExtractor {
 
 		if (Modifier.isInterface(clazz.getModifiers())) {
 			// Interface has no members and is therefore not handled as POJO
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
 		try {
-			TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type);
+			TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<>(typeHierarchy), parameterizedType, in1Type, in2Type);
 			if (pojoType != null) {
 				return pojoType;
 			}
@@ -1712,7 +1704,7 @@ public class TypeExtractor {
 		}
 
 		// return a generic type
-		return new GenericTypeInfo<OUT>(clazz);
+		return new GenericTypeInfo<>(clazz);
 	}
 
 	/**
@@ -1725,7 +1717,7 @@ public class TypeExtractor {
 	 * @param clazz class of field
 	 * @param typeHierarchy type hierarchy for materializing generic types
 	 */
-	private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) {
+	private boolean isValidPojoField(Field f, Class<?> clazz, List<Type> typeHierarchy) {
 		if(Modifier.isPublic(f.getModifiers())) {
 			return true;
 		} else {
@@ -1751,14 +1743,14 @@ public class TypeExtractor {
 					// no arguments for the getter
 					m.getParameterTypes().length == 0 &&
 					// return type is same as field type (or the generic variant of it)
-					(m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
+					(m.getGenericReturnType().equals( fieldType ) || (m.getReturnType().equals(fieldTypeWrapper)) || (m.getGenericReturnType().equals(fieldTypeGeneric)) )
 				) {
 					hasGetter = true;
 				}
 				// check for setters (<FieldName>_$eq for scala)
 				if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) &&
 					m.getParameterTypes().length == 1 && // one parameter of the field's type
-					(m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+					(m.getGenericParameterTypes()[0].equals( fieldType ) || (m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
 					// return type is void (or the class self).
 					(m.getReturnType().equals(Void.TYPE) || m.getReturnType().equals(clazz))
 				) {
@@ -1787,7 +1779,7 @@ public class TypeExtractor {
 			LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " +
 				"and must be processed as GenericType. Please read the Flink documentation " +
 				"on \"Data Types & Serialization\" for details of the effect on performance.");
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
 		// add the hierarchy of the POJO itself if it is generic
@@ -1804,10 +1796,10 @@ public class TypeExtractor {
 			LOG.info("No fields were detected for " + clazz + " so it cannot be used as a POJO type " +
 				"and must be processed as GenericType. Please read the Flink documentation " +
 				"on \"Data Types & Serialization\" for details of the effect on performance.");
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
-		List<PojoField> pojoFields = new ArrayList<PojoField>();
+		List<PojoField> pojoFields = new ArrayList<>();
 		for (Field field : fields) {
 			Type fieldType = field.getGenericType();
 			if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) {
@@ -1817,7 +1809,7 @@ public class TypeExtractor {
 				return null;
 			}
 			try {
-				ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+				ArrayList<Type> fieldTypeHierarchy = new ArrayList<>(typeHierarchy);
 				fieldTypeHierarchy.add(fieldType);
 				TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type);
 				pojoFields.add(new PojoField(field, ti));
@@ -1826,11 +1818,11 @@ public class TypeExtractor {
 				if(isClassType(fieldType)) {
 					genericClass = typeToClass(fieldType);
 				}
-				pojoFields.add(new PojoField(field, new GenericTypeInfo<OUT>((Class<OUT>) genericClass)));
+				pojoFields.add(new PojoField(field, new GenericTypeInfo<>((Class<OUT>) genericClass)));
 			}
 		}
 
-		CompositeType<OUT> pojoType = new PojoTypeInfo<OUT>(clazz, pojoFields);
+		CompositeType<OUT> pojoType = new PojoTypeInfo<>(clazz, pojoFields);
 
 		//
 		// Validate the correctness of the pojo.
@@ -1848,7 +1840,7 @@ public class TypeExtractor {
 
 		// Try retrieving the default constructor, if it does not have one
 		// we cannot use this because the serializer uses it.
-		Constructor defaultConstructor = null;
+		Constructor<OUT> defaultConstructor = null;
 		try {
 			defaultConstructor = clazz.getDeclaredConstructor();
 		} catch (NoSuchMethodException e) {
@@ -1885,7 +1877,7 @@ public class TypeExtractor {
 	 */
 	@PublicEvolving
 	public static List<Field> getAllDeclaredFields(Class<?> clazz, boolean ignoreDuplicates) {
-		List<Field> result = new ArrayList<Field>();
+		List<Field> result = new ArrayList<>();
 		while (clazz != null) {
 			Field[] fields = clazz.getDeclaredFields();
 			for (Field field : fields) {
@@ -1982,7 +1974,7 @@ public class TypeExtractor {
 				if (row.getField(i) == null) {
 					LOG.warn("Cannot extract type of Row field, because of Row field[" + i + "] is null. " +
 						"Should define RowTypeInfo explicitly.");
-					return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
+					return privateGetForClass((Class<X>) value.getClass(), new ArrayList<>());
 				}
 			}
 			TypeInformation<?>[] typeArray = new TypeInformation<?>[arity];
@@ -1992,7 +1984,7 @@ public class TypeExtractor {
 			return (TypeInformation<X>) new RowTypeInfo(typeArray);
 		}
 		else {
-			return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
+			return privateGetForClass((Class<X>) value.getClass(), new ArrayList<>());
 		}
 	}
 
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 09ce186..a898fc7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -57,33 +57,31 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 	}
 
 	@Override
-	@SuppressWarnings("deprecation")
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		return new AvroSerializer<>(getTypeClass());
 	}
 
-	@SuppressWarnings("unchecked")
 	@Internal
 	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
 			PojoTypeExtractor pte = new PojoTypeExtractor();
 			ArrayList<Type> typeHierarchy = new ArrayList<>();
 			typeHierarchy.add(typeClass);
-			TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+			TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
 
 			if (!(ti instanceof PojoTypeInfo)) {
 				throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
 			}
-			PojoTypeInfo pti =  (PojoTypeInfo) ti;
+			PojoTypeInfo<T> pti =  (PojoTypeInfo<T>) ti;
 			List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
 
 			for (int i = 0; i < pti.getArity(); i++) {
 				PojoField f = pti.getPojoFieldAt(i);
-				TypeInformation newType = f.getTypeInformation();
+				TypeInformation<?> newType = f.getTypeInformation();
 				// check if type is a CharSequence
 				if (newType instanceof GenericTypeInfo) {
 					if ((newType).getTypeClass().equals(CharSequence.class)) {
 						// replace the type by a org.apache.avro.util.Utf8
-						newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+						newType = new GenericTypeInfo<>(org.apache.avro.util.Utf8.class);
 					}
 				}
 				PojoField newField = new PojoField(f.getField(), newType);


[flink] 04/04: [FLINK-12175] Change filling of typeHierarchy in analyzePojo, for correctly creating fields TypeInfo

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8847061c40bf8ca17e22e6e412a378f53b8b82d
Author: Andrei Bulgakov <an...@gmail.com>
AuthorDate: Tue May 7 12:01:51 2019 +0300

    [FLINK-12175] Change filling of typeHierarchy in analyzePojo, for correctly creating fields TypeInfo
    
    Co-authored-by: Dawid Wysakowicz <dw...@apache.org>
---
 .../flink/api/java/typeutils/TypeExtractor.java    |  37 ++++----
 .../PojoParametrizedTypeExtractionTest.java        | 100 +++++++++++++++++++++
 .../flink/formats/avro/typeutils/AvroTypeInfo.java |  12 +--
 3 files changed, 123 insertions(+), 26 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 318f10d..4108ee2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -805,8 +805,9 @@ public class TypeExtractor {
 
 			// go up the hierarchy until we reach immediate child of Tuple (with or without generics)
 			// collect the types while moving up for a later top-down
+			List<Type> typeHierarchyForSubtypes = new ArrayList<>(typeHierarchy);
 			while (!(isClassType(curT) && typeToClass(curT).getSuperclass().equals(Tuple.class))) {
-				typeHierarchy.add(curT);
+				typeHierarchyForSubtypes.add(curT);
 				curT = typeToClass(curT).getGenericSuperclass();
 			}
 
@@ -819,24 +820,19 @@ public class TypeExtractor {
 				throw new InvalidTypesException("Tuple needs to be parameterized by using generics.");
 			}
 
-			typeHierarchy.add(curT);
+			typeHierarchyForSubtypes.add(curT);
 
 			// create the type information for the subtypes
 			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(
 				t,
 				(ParameterizedType) curT,
-				typeHierarchy,
+				typeHierarchyForSubtypes,
 				in1Type,
 				in2Type,
 				false);
 			// type needs to be treated a pojo due to additional fields
 			if (subTypesInfo == null) {
-				if (t instanceof ParameterizedType) {
-					return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
-				}
-				else {
-					return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), null, in1Type, in2Type);
-				}
+				return analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, in2Type);
 			}
 			// return tuple info
 			return new TupleTypeInfo(typeToClass(t), subTypesInfo);
@@ -1692,7 +1688,8 @@ public class TypeExtractor {
 		}
 
 		try {
-			TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<>(typeHierarchy), parameterizedType, in1Type, in2Type);
+			Type t = parameterizedType != null ? parameterizedType : clazz;
+			TypeInformation<OUT> pojoType = analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, in2Type);
 			if (pojoType != null) {
 				return pojoType;
 			}
@@ -1772,9 +1769,13 @@ public class TypeExtractor {
 	}
 
 	@SuppressWarnings("unchecked")
-	protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy,
-			ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+	protected <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(
+			Type type,
+			List<Type> typeHierarchy,
+			TypeInformation<IN1> in1Type,
+			TypeInformation<IN2> in2Type) {
 
+		Class<OUT> clazz = typeToClass(type);
 		if (!Modifier.isPublic(clazz.getModifiers())) {
 			LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " +
 				"and must be processed as GenericType. Please read the Flink documentation " +
@@ -1782,14 +1783,8 @@ public class TypeExtractor {
 			return new GenericTypeInfo<>(clazz);
 		}
 
-		// add the hierarchy of the POJO itself if it is generic
-		if (parameterizedType != null) {
-			getTypeHierarchy(typeHierarchy, parameterizedType, Object.class);
-		}
-		// create a type hierarchy, if the incoming only contains the most bottom one or none.
-		else if (typeHierarchy.size() <= 1) {
-			getTypeHierarchy(typeHierarchy, clazz, Object.class);
-		}
+		// add the hierarchy of the POJO
+		getTypeHierarchy(typeHierarchy, type, Object.class);
 
 		List<Field> fields = getAllDeclaredFields(clazz, false);
 		if (fields.size() == 0) {
@@ -1950,7 +1945,7 @@ public class TypeExtractor {
 			int numFields = t.getArity();
 			if(numFields != countFieldsInClass(value.getClass())) {
 				// not a tuple since it has more fields.
-				return analyzePojo((Class<X>) value.getClass(), new ArrayList<Type>(), null, null, null); // we immediately call analyze Pojo here, because
+				return analyzePojo(value.getClass(), new ArrayList<>(), null, null); // we immediately call analyze Pojo here, because
 				// there is currently no other type that can handle such a class.
 			}
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java
new file mode 100644
index 0000000..bed0375
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoParametrizedTypeExtractionTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ *  Tests concerning type extraction of Parametrized Pojo and its superclasses.
+ */
+public class PojoParametrizedTypeExtractionTest {
+	@Test
+	public void testDirectlyCreateTypeInfo() {
+		final TypeInformation<ParameterizedParentImpl> directTypeInfo =
+			TypeExtractor.createTypeInfo(ParameterizedParentImpl.class);
+
+		assertThat(directTypeInfo, equalTo(getParameterizedParentTypeInformation()));
+	}
+
+	@Test
+	public void testMapReturnTypeInfo(){
+		TypeInformation<ParameterizedParentImpl> expectedTypeInfo = getParameterizedParentTypeInformation();
+
+		TypeInformation<ParameterizedParentImpl> mapReturnTypeInfo = TypeExtractor
+			.getMapReturnTypes(new ConcreteMapFunction(), Types.INT);
+
+		assertThat(mapReturnTypeInfo, equalTo(expectedTypeInfo));
+	}
+
+	private TypeInformation<ParameterizedParentImpl> getParameterizedParentTypeInformation() {
+		Map<String, TypeInformation<?>> nestedFields = new HashMap<>();
+		nestedFields.put("digits", Types.INT);
+		nestedFields.put("letters", Types.STRING);
+
+		Map<String, TypeInformation<?>> fields = new HashMap<>();
+		fields.put("precise", Types.DOUBLE);
+		fields.put("pojoField", Types.POJO(Pojo.class, nestedFields));
+
+		return Types.POJO(
+			ParameterizedParentImpl.class,
+			fields
+		);
+	}
+
+	/**
+	 * Representation of Pojo class with 2 fields.
+	 */
+	public static class Pojo {
+		public int digits;
+		public String letters;
+	}
+
+	/**
+	 * Representation of class which is parametrized by some pojo.
+	 */
+	public static class ParameterizedParent<T> {
+		public T pojoField;
+	}
+
+	/**
+	 * Implementation of ParametrizedParent parametrized by Pojo.
+	 */
+	public static class ParameterizedParentImpl extends ParameterizedParent<Pojo> {
+		public double precise;
+	}
+	/**
+	 * Representation of map function for type extraction.
+	 */
+	public static class ConcreteMapFunction implements MapFunction<Integer, ParameterizedParentImpl> {
+		@Override
+		public ParameterizedParentImpl map(Integer value) throws Exception {
+			return null;
+		}
+	}
+}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 0fcee4f..9f4b469 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.avro.specific.SpecificRecordBase;
 
-import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
@@ -66,7 +65,7 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 			PojoTypeExtractor pte = new PojoTypeExtractor();
 			List<Type> typeHierarchy = new ArrayList<>();
 			typeHierarchy.add(typeClass);
-			TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+			TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null);
 
 			if (!(ti instanceof PojoTypeInfo)) {
 				throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
@@ -96,9 +95,12 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 		}
 
 		@Override
-		public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, List<Type> typeHierarchy,
-				ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-			return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type);
+		public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(
+				Type type,
+				List<Type> typeHierarchy,
+				TypeInformation<IN1> in1Type,
+				TypeInformation<IN2> in2Type) {
+			return super.analyzePojo(type, typeHierarchy, in1Type, in2Type);
 		}
 	}
 }