You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/30 14:13:12 UTC

[flink] 02/04: [hotfix] Remove warnings in TypeExtractor, AvroTypeInfo

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3309d14b1abec8f9303f5158a57eee17c4de92d2
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 29 14:06:52 2020 +0200

    [hotfix] Remove warnings in TypeExtractor, AvroTypeInfo
---
 .../api/java/typeutils/TypeExtractionUtils.java    |   7 +-
 .../flink/api/java/typeutils/TypeExtractor.java    | 118 ++++++++++-----------
 .../flink/formats/avro/typeutils/AvroTypeInfo.java |  10 +-
 3 files changed, 63 insertions(+), 72 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index eef309e..600ea8e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -257,12 +257,13 @@ public class TypeExtractionUtils {
 	/**
 	 * Convert ParameterizedType or Class to a Class.
 	 */
-	public static Class<?> typeToClass(Type t) {
+	@SuppressWarnings("unchecked")
+	public static <T> Class<T> typeToClass(Type t) {
 		if (t instanceof Class) {
-			return (Class<?>)t;
+			return (Class<T>) t;
 		}
 		else if (t instanceof ParameterizedType) {
-			return ((Class<?>) ((ParameterizedType) t).getRawType());
+			return ((Class<T>) ((ParameterizedType) t).getRawType());
 		}
 		throw new IllegalArgumentException("Cannot convert type to class");
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b94dd5f..a990f76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -146,7 +146,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) mapInterface,
+			mapInterface,
 			MapFunction.class,
 			0,
 			1,
@@ -167,7 +167,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) flatMapInterface,
+			flatMapInterface,
 			FlatMapFunction.class,
 			0,
 			1,
@@ -195,7 +195,7 @@ public class TypeExtractor {
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) foldInterface,
+			foldInterface,
 			FoldFunction.class,
 			0,
 			1,
@@ -251,7 +251,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) mapPartitionInterface,
+			mapPartitionInterface,
 			MapPartitionFunction.class,
 			0,
 			1,
@@ -271,7 +271,7 @@ public class TypeExtractor {
 			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) groupReduceInterface,
+			groupReduceInterface,
 			GroupReduceFunction.class,
 			0,
 			1,
@@ -291,7 +291,7 @@ public class TypeExtractor {
 																			String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) combineInterface,
+			combineInterface,
 			GroupCombineFunction.class,
 			0,
 			1,
@@ -313,7 +313,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) joinInterface,
+			joinInterface,
 			FlatJoinFunction.class,
 			0,
 			1,
@@ -337,7 +337,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) joinInterface,
+			joinInterface,
 			JoinFunction.class,
 			0,
 			1,
@@ -361,7 +361,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) coGroupInterface,
+			coGroupInterface,
 			CoGroupFunction.class,
 			0,
 			1,
@@ -385,7 +385,7 @@ public class TypeExtractor {
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
 	{
 		return getBinaryOperatorReturnType(
-			(Function) crossInterface,
+			crossInterface,
 			CrossFunction.class,
 			0,
 			1,
@@ -407,7 +407,7 @@ public class TypeExtractor {
 			TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType(
-			(Function) selectorInterface,
+			selectorInterface,
 			KeySelector.class,
 			0,
 			1,
@@ -749,7 +749,7 @@ public class TypeExtractor {
 	// ----------------------------------- private methods ----------------------------------------
 
 	private TypeInformation<?> privateCreateTypeInfo(Type t) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 		typeHierarchy.add(t);
 		return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
 	}
@@ -758,7 +758,7 @@ public class TypeExtractor {
 	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
 			TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 		Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
 
 		TypeInformation<OUT> typeInfo;
@@ -773,16 +773,15 @@ public class TypeExtractor {
 		}
 
 		// get info from hierarchy
-		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+		return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
 
 	// for LambdaFunctions
-	@SuppressWarnings("unchecked")
 	private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 
 		// get info from hierarchy
-		return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+		return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -823,14 +822,20 @@ public class TypeExtractor {
 			typeHierarchy.add(curT);
 
 			// create the type information for the subtypes
-			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
+			final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(
+				t,
+				(ParameterizedType) curT,
+				typeHierarchy,
+				in1Type,
+				in2Type,
+				false);
 			// type needs to be treated a pojo due to additional fields
 			if (subTypesInfo == null) {
 				if (t instanceof ParameterizedType) {
-					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
+					return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
 				}
 				else {
-					return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
+					return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), null, in1Type, in2Type);
 				}
 			}
 			// return tuple info
@@ -887,7 +892,7 @@ public class TypeExtractor {
 		}
 		// objects with generics are treated as Class first
 		else if (t instanceof ParameterizedType) {
-			return (TypeInformation<OUT>) privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
+			return privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
 		}
 		// no tuple, no TypeVariable, no generic type
 		else if (t instanceof Class) {
@@ -916,10 +921,11 @@ public class TypeExtractor {
 		}
 
 		// create a new type hierarchy for the input
-		ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> inputTypeHierarchy = new ArrayList<>();
 		// copy the function part of the type hierarchy
 		for (Type t : returnTypeHierarchy) {
-			if (isClassType(t) && Function.class.isAssignableFrom(typeToClass(t)) && typeToClass(t) != Function.class) {
+			Class<?> clazz = typeToClass(t);
+			if (isClassType(t) && Function.class.isAssignableFrom(clazz) && clazz != Function.class) {
 				inputTypeHierarchy.add(t);
 			}
 			else {
@@ -938,21 +944,17 @@ public class TypeExtractor {
 			// find the deepest type variable that describes the type of input 1
 			Type in1Type = baseClass.getActualTypeArguments()[0];
 
-			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in1Type, in1TypeInfo);
+			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<>(inputTypeHierarchy), in1Type, in1TypeInfo);
 		}
 
 		if (info == null && in2TypeInfo != null) {
 			// find the deepest type variable that describes the type of input 2
 			Type in2Type = baseClass.getActualTypeArguments()[1];
 
-			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in2Type, in2TypeInfo);
-		}
-
-		if (info != null) {
-			return info;
+			info = createTypeInfoFromInput(returnTypeVar, new ArrayList<>(inputTypeHierarchy), in2Type, in2TypeInfo);
 		}
 
-		return null;
+		return info;
 	}
 
 	/**
@@ -963,7 +965,6 @@ public class TypeExtractor {
 	 * Return the type information for "returnTypeVar" given that "inType" has type information "inTypeInfo".
 	 * Thus "inType" must contain "returnTypeVar" in a "inputTypeHierarchy", otherwise null is returned.
 	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
 	private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
 		TypeInformation<?> info = null;
 
@@ -1247,18 +1248,8 @@ public class TypeExtractor {
 	//  Validate input
 	// --------------------------------------------------------------------------------------------
 
-	private static void validateInputType(Type t, TypeInformation<?> inType) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
-		try {
-			validateInfo(typeHierarchy, t, inType);
-		}
-		catch(InvalidTypesException e) {
-			throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
-		}
-	}
-
 	private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
-		ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
 
 		// try to get generic parameter
 		Type inType;
@@ -1353,7 +1344,7 @@ public class TypeExtractor {
 				}
 
 				for (int i = 0; i < subTypes.length; i++) {
-					validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
+					validateInfo(new ArrayList<>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
 				}
 			}
 			// check for primitive array
@@ -1426,7 +1417,7 @@ public class TypeExtractor {
 
 				TypeInformation<?> actual;
 				// check value type contents
-				if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
+				if (!typeInfo.equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
 					throw new InvalidTypesException("Value type '" + typeInfo + "' expected but was '" + actual + "'.");
 				}
 			}
@@ -1479,6 +1470,7 @@ public class TypeExtractor {
 	 * Returns the type information factory for a type using the factory registry or annotations.
 	 */
 	@Internal
+	@SuppressWarnings("unchecked")
 	public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
 		final Class<?> factoryClass;
 		if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
@@ -1516,7 +1508,7 @@ public class TypeExtractor {
 	 * @return closest type information factory or null if there is no factory in the type hierarchy
 	 */
 	private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
-		TypeInfoFactory factory = null;
+		TypeInfoFactory<OUT> factory = null;
 		while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
 			typeHierarchy.add(t);
 			factory = getTypeInfoFactory(t);
@@ -1545,7 +1537,7 @@ public class TypeExtractor {
 	 * Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards.
 	 * If a value could not be found it will return the most bottom type variable in the hierarchy.
 	 */
-	private static Type materializeTypeVariable(ArrayList<Type> typeHierarchy, TypeVariable<?> typeVar) {
+	private static Type materializeTypeVariable(List<Type> typeHierarchy, TypeVariable<?> typeVar) {
 		TypeVariable<?> inTypeTypeVar = typeVar;
 		// iterate thru hierarchy from top to bottom until type variable gets a class assigned
 		for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
@@ -1618,7 +1610,7 @@ public class TypeExtractor {
 
 		// Class is handled as generic type info
 		if (clazz.equals(Class.class)) {
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
 		// recursive types are handled as generic type info
@@ -1696,11 +1688,11 @@ public class TypeExtractor {
 
 		if (Modifier.isInterface(clazz.getModifiers())) {
 			// Interface has no members and is therefore not handled as POJO
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
 		try {
-			TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type);
+			TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<>(typeHierarchy), parameterizedType, in1Type, in2Type);
 			if (pojoType != null) {
 				return pojoType;
 			}
@@ -1712,7 +1704,7 @@ public class TypeExtractor {
 		}
 
 		// return a generic type
-		return new GenericTypeInfo<OUT>(clazz);
+		return new GenericTypeInfo<>(clazz);
 	}
 
 	/**
@@ -1725,7 +1717,7 @@ public class TypeExtractor {
 	 * @param clazz class of field
 	 * @param typeHierarchy type hierarchy for materializing generic types
 	 */
-	private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) {
+	private boolean isValidPojoField(Field f, Class<?> clazz, List<Type> typeHierarchy) {
 		if(Modifier.isPublic(f.getModifiers())) {
 			return true;
 		} else {
@@ -1751,14 +1743,14 @@ public class TypeExtractor {
 					// no arguments for the getter
 					m.getParameterTypes().length == 0 &&
 					// return type is same as field type (or the generic variant of it)
-					(m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
+					(m.getGenericReturnType().equals( fieldType ) || (m.getReturnType().equals(fieldTypeWrapper)) || (m.getGenericReturnType().equals(fieldTypeGeneric)) )
 				) {
 					hasGetter = true;
 				}
 				// check for setters (<FieldName>_$eq for scala)
 				if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) &&
 					m.getParameterTypes().length == 1 && // one parameter of the field's type
-					(m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+					(m.getGenericParameterTypes()[0].equals( fieldType ) || (m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
 					// return type is void (or the class self).
 					(m.getReturnType().equals(Void.TYPE) || m.getReturnType().equals(clazz))
 				) {
@@ -1787,7 +1779,7 @@ public class TypeExtractor {
 			LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " +
 				"and must be processed as GenericType. Please read the Flink documentation " +
 				"on \"Data Types & Serialization\" for details of the effect on performance.");
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
 		// add the hierarchy of the POJO itself if it is generic
@@ -1804,10 +1796,10 @@ public class TypeExtractor {
 			LOG.info("No fields were detected for " + clazz + " so it cannot be used as a POJO type " +
 				"and must be processed as GenericType. Please read the Flink documentation " +
 				"on \"Data Types & Serialization\" for details of the effect on performance.");
-			return new GenericTypeInfo<OUT>(clazz);
+			return new GenericTypeInfo<>(clazz);
 		}
 
-		List<PojoField> pojoFields = new ArrayList<PojoField>();
+		List<PojoField> pojoFields = new ArrayList<>();
 		for (Field field : fields) {
 			Type fieldType = field.getGenericType();
 			if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) {
@@ -1817,7 +1809,7 @@ public class TypeExtractor {
 				return null;
 			}
 			try {
-				ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+				ArrayList<Type> fieldTypeHierarchy = new ArrayList<>(typeHierarchy);
 				fieldTypeHierarchy.add(fieldType);
 				TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type);
 				pojoFields.add(new PojoField(field, ti));
@@ -1826,11 +1818,11 @@ public class TypeExtractor {
 				if(isClassType(fieldType)) {
 					genericClass = typeToClass(fieldType);
 				}
-				pojoFields.add(new PojoField(field, new GenericTypeInfo<OUT>((Class<OUT>) genericClass)));
+				pojoFields.add(new PojoField(field, new GenericTypeInfo<>((Class<OUT>) genericClass)));
 			}
 		}
 
-		CompositeType<OUT> pojoType = new PojoTypeInfo<OUT>(clazz, pojoFields);
+		CompositeType<OUT> pojoType = new PojoTypeInfo<>(clazz, pojoFields);
 
 		//
 		// Validate the correctness of the pojo.
@@ -1848,7 +1840,7 @@ public class TypeExtractor {
 
 		// Try retrieving the default constructor, if it does not have one
 		// we cannot use this because the serializer uses it.
-		Constructor defaultConstructor = null;
+		Constructor<OUT> defaultConstructor = null;
 		try {
 			defaultConstructor = clazz.getDeclaredConstructor();
 		} catch (NoSuchMethodException e) {
@@ -1885,7 +1877,7 @@ public class TypeExtractor {
 	 */
 	@PublicEvolving
 	public static List<Field> getAllDeclaredFields(Class<?> clazz, boolean ignoreDuplicates) {
-		List<Field> result = new ArrayList<Field>();
+		List<Field> result = new ArrayList<>();
 		while (clazz != null) {
 			Field[] fields = clazz.getDeclaredFields();
 			for (Field field : fields) {
@@ -1982,7 +1974,7 @@ public class TypeExtractor {
 				if (row.getField(i) == null) {
 					LOG.warn("Cannot extract type of Row field, because of Row field[" + i + "] is null. " +
 						"Should define RowTypeInfo explicitly.");
-					return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
+					return privateGetForClass((Class<X>) value.getClass(), new ArrayList<>());
 				}
 			}
 			TypeInformation<?>[] typeArray = new TypeInformation<?>[arity];
@@ -1992,7 +1984,7 @@ public class TypeExtractor {
 			return (TypeInformation<X>) new RowTypeInfo(typeArray);
 		}
 		else {
-			return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
+			return privateGetForClass((Class<X>) value.getClass(), new ArrayList<>());
 		}
 	}
 
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 09ce186..a898fc7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -57,33 +57,31 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 	}
 
 	@Override
-	@SuppressWarnings("deprecation")
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		return new AvroSerializer<>(getTypeClass());
 	}
 
-	@SuppressWarnings("unchecked")
 	@Internal
 	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
 			PojoTypeExtractor pte = new PojoTypeExtractor();
 			ArrayList<Type> typeHierarchy = new ArrayList<>();
 			typeHierarchy.add(typeClass);
-			TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+			TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
 
 			if (!(ti instanceof PojoTypeInfo)) {
 				throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
 			}
-			PojoTypeInfo pti =  (PojoTypeInfo) ti;
+			PojoTypeInfo<T> pti =  (PojoTypeInfo<T>) ti;
 			List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
 
 			for (int i = 0; i < pti.getArity(); i++) {
 				PojoField f = pti.getPojoFieldAt(i);
-				TypeInformation newType = f.getTypeInformation();
+				TypeInformation<?> newType = f.getTypeInformation();
 				// check if type is a CharSequence
 				if (newType instanceof GenericTypeInfo) {
 					if ((newType).getTypeClass().equals(CharSequence.class)) {
 						// replace the type by a org.apache.avro.util.Utf8
-						newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+						newType = new GenericTypeInfo<>(org.apache.avro.util.Utf8.class);
 					}
 				}
 				PojoField newField = new PojoField(f.getField(), newType);