You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/30 14:13:12 UTC
[flink] 02/04: [hotfix] Remove warnings in TypeExtractor,
AvroTypeInfo
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3309d14b1abec8f9303f5158a57eee17c4de92d2
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jul 29 14:06:52 2020 +0200
[hotfix] Remove warnings in TypeExtractor, AvroTypeInfo
---
.../api/java/typeutils/TypeExtractionUtils.java | 7 +-
.../flink/api/java/typeutils/TypeExtractor.java | 118 ++++++++++-----------
.../flink/formats/avro/typeutils/AvroTypeInfo.java | 10 +-
3 files changed, 63 insertions(+), 72 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index eef309e..600ea8e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -257,12 +257,13 @@ public class TypeExtractionUtils {
/**
* Convert ParameterizedType or Class to a Class.
*/
- public static Class<?> typeToClass(Type t) {
+ @SuppressWarnings("unchecked")
+ public static <T> Class<T> typeToClass(Type t) {
if (t instanceof Class) {
- return (Class<?>)t;
+ return (Class<T>) t;
}
else if (t instanceof ParameterizedType) {
- return ((Class<?>) ((ParameterizedType) t).getRawType());
+ return ((Class<T>) ((ParameterizedType) t).getRawType());
}
throw new IllegalArgumentException("Cannot convert type to class");
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index b94dd5f..a990f76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -146,7 +146,7 @@ public class TypeExtractor {
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) mapInterface,
+ mapInterface,
MapFunction.class,
0,
1,
@@ -167,7 +167,7 @@ public class TypeExtractor {
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) flatMapInterface,
+ flatMapInterface,
FlatMapFunction.class,
0,
1,
@@ -195,7 +195,7 @@ public class TypeExtractor {
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) foldInterface,
+ foldInterface,
FoldFunction.class,
0,
1,
@@ -251,7 +251,7 @@ public class TypeExtractor {
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) mapPartitionInterface,
+ mapPartitionInterface,
MapPartitionFunction.class,
0,
1,
@@ -271,7 +271,7 @@ public class TypeExtractor {
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) groupReduceInterface,
+ groupReduceInterface,
GroupReduceFunction.class,
0,
1,
@@ -291,7 +291,7 @@ public class TypeExtractor {
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) combineInterface,
+ combineInterface,
GroupCombineFunction.class,
0,
1,
@@ -313,7 +313,7 @@ public class TypeExtractor {
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
- (Function) joinInterface,
+ joinInterface,
FlatJoinFunction.class,
0,
1,
@@ -337,7 +337,7 @@ public class TypeExtractor {
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
- (Function) joinInterface,
+ joinInterface,
JoinFunction.class,
0,
1,
@@ -361,7 +361,7 @@ public class TypeExtractor {
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
- (Function) coGroupInterface,
+ coGroupInterface,
CoGroupFunction.class,
0,
1,
@@ -385,7 +385,7 @@ public class TypeExtractor {
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
- (Function) crossInterface,
+ crossInterface,
CrossFunction.class,
0,
1,
@@ -407,7 +407,7 @@ public class TypeExtractor {
TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType(
- (Function) selectorInterface,
+ selectorInterface,
KeySelector.class,
0,
1,
@@ -749,7 +749,7 @@ public class TypeExtractor {
// ----------------------------------- private methods ----------------------------------------
private TypeInformation<?> privateCreateTypeInfo(Type t) {
- ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+ ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(t);
return createTypeInfoWithTypeHierarchy(typeHierarchy, t, null, null);
}
@@ -758,7 +758,7 @@ public class TypeExtractor {
@SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+ ArrayList<Type> typeHierarchy = new ArrayList<>();
Type returnType = getParameterType(baseClass, typeHierarchy, clazz, returnParamPos);
TypeInformation<OUT> typeInfo;
@@ -773,16 +773,15 @@ public class TypeExtractor {
}
// get info from hierarchy
- return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+ return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
// for LambdaFunctions
- @SuppressWarnings("unchecked")
private <IN1, IN2, OUT> TypeInformation<OUT> privateCreateTypeInfo(Type returnType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
- ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+ ArrayList<Type> typeHierarchy = new ArrayList<>();
// get info from hierarchy
- return (TypeInformation<OUT>) createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
+ return createTypeInfoWithTypeHierarchy(typeHierarchy, returnType, in1Type, in2Type);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -823,14 +822,20 @@ public class TypeExtractor {
typeHierarchy.add(curT);
// create the type information for the subtypes
- final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
+ final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(
+ t,
+ (ParameterizedType) curT,
+ typeHierarchy,
+ in1Type,
+ in2Type,
+ false);
// type needs to be treated a pojo due to additional fields
if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
- return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
+ return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
else {
- return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
+ return analyzePojo(typeToClass(t), new ArrayList<>(typeHierarchy), null, in1Type, in2Type);
}
}
// return tuple info
@@ -887,7 +892,7 @@ public class TypeExtractor {
}
// objects with generics are treated as Class first
else if (t instanceof ParameterizedType) {
- return (TypeInformation<OUT>) privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
+ return privateGetForClass(typeToClass(t), typeHierarchy, (ParameterizedType) t, in1Type, in2Type);
}
// no tuple, no TypeVariable, no generic type
else if (t instanceof Class) {
@@ -916,10 +921,11 @@ public class TypeExtractor {
}
// create a new type hierarchy for the input
- ArrayList<Type> inputTypeHierarchy = new ArrayList<Type>();
+ ArrayList<Type> inputTypeHierarchy = new ArrayList<>();
// copy the function part of the type hierarchy
for (Type t : returnTypeHierarchy) {
- if (isClassType(t) && Function.class.isAssignableFrom(typeToClass(t)) && typeToClass(t) != Function.class) {
+ Class<?> clazz = typeToClass(t);
+ if (isClassType(t) && Function.class.isAssignableFrom(clazz) && clazz != Function.class) {
inputTypeHierarchy.add(t);
}
else {
@@ -938,21 +944,17 @@ public class TypeExtractor {
// find the deepest type variable that describes the type of input 1
Type in1Type = baseClass.getActualTypeArguments()[0];
- info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in1Type, in1TypeInfo);
+ info = createTypeInfoFromInput(returnTypeVar, new ArrayList<>(inputTypeHierarchy), in1Type, in1TypeInfo);
}
if (info == null && in2TypeInfo != null) {
// find the deepest type variable that describes the type of input 2
Type in2Type = baseClass.getActualTypeArguments()[1];
- info = createTypeInfoFromInput(returnTypeVar, new ArrayList<Type>(inputTypeHierarchy), in2Type, in2TypeInfo);
- }
-
- if (info != null) {
- return info;
+ info = createTypeInfoFromInput(returnTypeVar, new ArrayList<>(inputTypeHierarchy), in2Type, in2TypeInfo);
}
- return null;
+ return info;
}
/**
@@ -963,7 +965,6 @@ public class TypeExtractor {
* Return the type information for "returnTypeVar" given that "inType" has type information "inTypeInfo".
* Thus "inType" must contain "returnTypeVar" in a "inputTypeHierarchy", otherwise null is returned.
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
private <IN1> TypeInformation<?> createTypeInfoFromInput(TypeVariable<?> returnTypeVar, ArrayList<Type> inputTypeHierarchy, Type inType, TypeInformation<IN1> inTypeInfo) {
TypeInformation<?> info = null;
@@ -1247,18 +1248,8 @@ public class TypeExtractor {
// Validate input
// --------------------------------------------------------------------------------------------
- private static void validateInputType(Type t, TypeInformation<?> inType) {
- ArrayList<Type> typeHierarchy = new ArrayList<Type>();
- try {
- validateInfo(typeHierarchy, t, inType);
- }
- catch(InvalidTypesException e) {
- throw new InvalidTypesException("Input mismatch: " + e.getMessage(), e);
- }
- }
-
private static void validateInputType(Class<?> baseClass, Class<?> clazz, int inputParamPos, TypeInformation<?> inTypeInfo) {
- ArrayList<Type> typeHierarchy = new ArrayList<Type>();
+ ArrayList<Type> typeHierarchy = new ArrayList<>();
// try to get generic parameter
Type inType;
@@ -1353,7 +1344,7 @@ public class TypeExtractor {
}
for (int i = 0; i < subTypes.length; i++) {
- validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
+ validateInfo(new ArrayList<>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
}
// check for primitive array
@@ -1426,7 +1417,7 @@ public class TypeExtractor {
TypeInformation<?> actual;
// check value type contents
- if (!((ValueTypeInfo<?>) typeInfo).equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
+ if (!typeInfo.equals(actual = ValueTypeInfo.getValueTypeInfo((Class<? extends Value>) type))) {
throw new InvalidTypesException("Value type '" + typeInfo + "' expected but was '" + actual + "'.");
}
}
@@ -1479,6 +1470,7 @@ public class TypeExtractor {
* Returns the type information factory for a type using the factory registry or annotations.
*/
@Internal
+ @SuppressWarnings("unchecked")
public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
final Class<?> factoryClass;
if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
@@ -1516,7 +1508,7 @@ public class TypeExtractor {
* @return closest type information factory or null if there is no factory in the type hierarchy
*/
private static <OUT> TypeInfoFactory<? super OUT> getClosestFactory(ArrayList<Type> typeHierarchy, Type t) {
- TypeInfoFactory factory = null;
+ TypeInfoFactory<OUT> factory = null;
while (factory == null && isClassType(t) && !(typeToClass(t).equals(Object.class))) {
typeHierarchy.add(t);
factory = getTypeInfoFactory(t);
@@ -1545,7 +1537,7 @@ public class TypeExtractor {
* Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards.
* If a value could not be found it will return the most bottom type variable in the hierarchy.
*/
- private static Type materializeTypeVariable(ArrayList<Type> typeHierarchy, TypeVariable<?> typeVar) {
+ private static Type materializeTypeVariable(List<Type> typeHierarchy, TypeVariable<?> typeVar) {
TypeVariable<?> inTypeTypeVar = typeVar;
// iterate thru hierarchy from top to bottom until type variable gets a class assigned
for (int i = typeHierarchy.size() - 1; i >= 0; i--) {
@@ -1618,7 +1610,7 @@ public class TypeExtractor {
// Class is handled as generic type info
if (clazz.equals(Class.class)) {
- return new GenericTypeInfo<OUT>(clazz);
+ return new GenericTypeInfo<>(clazz);
}
// recursive types are handled as generic type info
@@ -1696,11 +1688,11 @@ public class TypeExtractor {
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
- return new GenericTypeInfo<OUT>(clazz);
+ return new GenericTypeInfo<>(clazz);
}
try {
- TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type);
+ TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<>(typeHierarchy), parameterizedType, in1Type, in2Type);
if (pojoType != null) {
return pojoType;
}
@@ -1712,7 +1704,7 @@ public class TypeExtractor {
}
// return a generic type
- return new GenericTypeInfo<OUT>(clazz);
+ return new GenericTypeInfo<>(clazz);
}
/**
@@ -1725,7 +1717,7 @@ public class TypeExtractor {
* @param clazz class of field
* @param typeHierarchy type hierarchy for materializing generic types
*/
- private boolean isValidPojoField(Field f, Class<?> clazz, ArrayList<Type> typeHierarchy) {
+ private boolean isValidPojoField(Field f, Class<?> clazz, List<Type> typeHierarchy) {
if(Modifier.isPublic(f.getModifiers())) {
return true;
} else {
@@ -1751,14 +1743,14 @@ public class TypeExtractor {
// no arguments for the getter
m.getParameterTypes().length == 0 &&
// return type is same as field type (or the generic variant of it)
- (m.getGenericReturnType().equals( fieldType ) || (fieldTypeWrapper != null && m.getReturnType().equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericReturnType().equals(fieldTypeGeneric)) )
+ (m.getGenericReturnType().equals( fieldType ) || (m.getReturnType().equals(fieldTypeWrapper)) || (m.getGenericReturnType().equals(fieldTypeGeneric)) )
) {
hasGetter = true;
}
// check for setters (<FieldName>_$eq for scala)
if((methodNameLow.equals("set"+fieldNameLow) || methodNameLow.equals(fieldNameLow+"_$eq")) &&
m.getParameterTypes().length == 1 && // one parameter of the field's type
- (m.getGenericParameterTypes()[0].equals( fieldType ) || (fieldTypeWrapper != null && m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (fieldTypeGeneric != null && m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
+ (m.getGenericParameterTypes()[0].equals( fieldType ) || (m.getParameterTypes()[0].equals( fieldTypeWrapper )) || (m.getGenericParameterTypes()[0].equals(fieldTypeGeneric) ) )&&
// return type is void (or the class self).
(m.getReturnType().equals(Void.TYPE) || m.getReturnType().equals(clazz))
) {
@@ -1787,7 +1779,7 @@ public class TypeExtractor {
LOG.info("Class " + clazz.getName() + " is not public so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
- return new GenericTypeInfo<OUT>(clazz);
+ return new GenericTypeInfo<>(clazz);
}
// add the hierarchy of the POJO itself if it is generic
@@ -1804,10 +1796,10 @@ public class TypeExtractor {
LOG.info("No fields were detected for " + clazz + " so it cannot be used as a POJO type " +
"and must be processed as GenericType. Please read the Flink documentation " +
"on \"Data Types & Serialization\" for details of the effect on performance.");
- return new GenericTypeInfo<OUT>(clazz);
+ return new GenericTypeInfo<>(clazz);
}
- List<PojoField> pojoFields = new ArrayList<PojoField>();
+ List<PojoField> pojoFields = new ArrayList<>();
for (Field field : fields) {
Type fieldType = field.getGenericType();
if(!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) {
@@ -1817,7 +1809,7 @@ public class TypeExtractor {
return null;
}
try {
- ArrayList<Type> fieldTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+ ArrayList<Type> fieldTypeHierarchy = new ArrayList<>(typeHierarchy);
fieldTypeHierarchy.add(fieldType);
TypeInformation<?> ti = createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, in1Type, in2Type);
pojoFields.add(new PojoField(field, ti));
@@ -1826,11 +1818,11 @@ public class TypeExtractor {
if(isClassType(fieldType)) {
genericClass = typeToClass(fieldType);
}
- pojoFields.add(new PojoField(field, new GenericTypeInfo<OUT>((Class<OUT>) genericClass)));
+ pojoFields.add(new PojoField(field, new GenericTypeInfo<>((Class<OUT>) genericClass)));
}
}
- CompositeType<OUT> pojoType = new PojoTypeInfo<OUT>(clazz, pojoFields);
+ CompositeType<OUT> pojoType = new PojoTypeInfo<>(clazz, pojoFields);
//
// Validate the correctness of the pojo.
@@ -1848,7 +1840,7 @@ public class TypeExtractor {
// Try retrieving the default constructor, if it does not have one
// we cannot use this because the serializer uses it.
- Constructor defaultConstructor = null;
+ Constructor<OUT> defaultConstructor = null;
try {
defaultConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
@@ -1885,7 +1877,7 @@ public class TypeExtractor {
*/
@PublicEvolving
public static List<Field> getAllDeclaredFields(Class<?> clazz, boolean ignoreDuplicates) {
- List<Field> result = new ArrayList<Field>();
+ List<Field> result = new ArrayList<>();
while (clazz != null) {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
@@ -1982,7 +1974,7 @@ public class TypeExtractor {
if (row.getField(i) == null) {
LOG.warn("Cannot extract type of Row field, because of Row field[" + i + "] is null. " +
"Should define RowTypeInfo explicitly.");
- return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
+ return privateGetForClass((Class<X>) value.getClass(), new ArrayList<>());
}
}
TypeInformation<?>[] typeArray = new TypeInformation<?>[arity];
@@ -1992,7 +1984,7 @@ public class TypeExtractor {
return (TypeInformation<X>) new RowTypeInfo(typeArray);
}
else {
- return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
+ return privateGetForClass((Class<X>) value.getClass(), new ArrayList<>());
}
}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 09ce186..a898fc7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -57,33 +57,31 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
}
@Override
- @SuppressWarnings("deprecation")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
return new AvroSerializer<>(getTypeClass());
}
- @SuppressWarnings("unchecked")
@Internal
private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
PojoTypeExtractor pte = new PojoTypeExtractor();
ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(typeClass);
- TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+ TypeInformation<T> ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
if (!(ti instanceof PojoTypeInfo)) {
throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
}
- PojoTypeInfo pti = (PojoTypeInfo) ti;
+ PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
for (int i = 0; i < pti.getArity(); i++) {
PojoField f = pti.getPojoFieldAt(i);
- TypeInformation newType = f.getTypeInformation();
+ TypeInformation<?> newType = f.getTypeInformation();
// check if type is a CharSequence
if (newType instanceof GenericTypeInfo) {
if ((newType).getTypeClass().equals(CharSequence.class)) {
// replace the type by a org.apache.avro.util.Utf8
- newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+ newType = new GenericTypeInfo<>(org.apache.avro.util.Utf8.class);
}
}
PojoField newField = new PojoField(f.getField(), newType);