You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fuyao Li <fu...@oracle.com> on 2022/05/12 16:45:58 UTC
Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi Weihua,
I am following all the standards mentioned here. The code structure is listed in the previous email.
@Data
Class Metadata {
@TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map<String, FieldDefinition> fields;
@TypeInfo(StringSetTypeInfoFactory.class)
private Set<String> validColumns = new HashSet<>();
}
Class FieldDefinition {
Metadata parentMetadata; // causing recusive resolving when type info is added
}
public class StringFieldDefinitionMapTypeInfoFactory extends TypeInfoFactory<Map<String, FieldDefinition>> {
@Override
public TypeInformation<Map<String, FieldDefinition>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return new MapTypeInfo<String, FieldDefinition>(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
}
}
public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
@Override
public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return TypeInformation.of(new TypeHint<Set<String>>() {});
}
}
I am using @Data in @Lombok to expose the getters and setters. For List<>, Map<>, Set<>, due to the type erase behavior. I have to provide the type information through type factory<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory> and type hints.
I have two problems:
1. the main blocker here is that it has recursive logic mentioned below.
2. Set<String> type hint doesn’t work well.
How could I define the TypeInformation for Set, it seems not supported in java type.
I provided something like this and then annotated thIS field in the POJO.
public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
@Override
public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return TypeInformation.of(new TypeHint<Set<String>>() {});
}
}
But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set is treated as a generic type.
Thanks,
Fuyao
From: Weihua Hu <hu...@gmail.com>
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li <fu...@oracle.com>
Cc: user <us...@flink.apache.org>
Subject: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi, Fuyao
How did you define these classes? There is some requirements for POJO as flink docs[1] said:
* The class must be public.
* It must have a public constructor without arguments (default constructor).
* All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().
* The type of a field must be supported by a registered serializer.
[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>
Best,
Weihua
2022年5月12日 上午3:03,Fuyao Li <fu...@oracle.com>> 写道:
Hi Community,
I have a POJO that has nested recursively resolved structure. How should I define the @TypeInfo annotation correctly to avoid stack overflow exception when starting the application.
Basically,
Class Metadata
Map<String, FieldDefinition> fields
Class FieldDefinition
Metadata parentMetadata
The Metadata class got resolved recursively and causing stack overflow. I had to design this way since the metadata structure model looks like this.
Is there any ways to fix this issue? Or I must treat this as a Generic type and don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); statement, there won’t be any problem during program startup.
Thanks,
Fuyao
Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Posted by Fuyao Li <fu...@oracle.com>.
Updated the FieldDefinition class inline to avoid confusion. I am just listing a few fields in the class (not all). It is all following suggested POJO approach.
From: Fuyao Li <fu...@oracle.com>
Date: Thursday, May 12, 2022 at 09:46
To: Weihua Hu <hu...@gmail.com>
Cc: User <us...@flink.apache.org>
Subject: Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi Weihua,
I am following all the standards mentioned here. The code structure is listed in the previous email.
@Data
Class Metadata {
@TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map<String, FieldDefinition> fields;
@TypeInfo(StringSetTypeInfoFactory.class)
private Set<String> validColumns = new HashSet<>();
}
@Data
Class FieldDefinition {
private Metadata parentMetadata; // causing recusive resolving when type info is added
}
public class StringFieldDefinitionMapTypeInfoFactory extends TypeInfoFactory<Map<String, FieldDefinition>> {
@Override
public TypeInformation<Map<String, FieldDefinition>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return new MapTypeInfo<String, FieldDefinition>(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
}
}
public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
@Override
public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return TypeInformation.of(new TypeHint<Set<String>>() {});
}
}
I am using @Data in @Lombok to expose the getters and setters. For List<>, Map<>, Set<>, due to the type erase behavior. I have to provide the type information through type factory<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*defining-type-information-using-a-factory__;Iw!!ACWV5N9M2RV99hQ!LCjf2FOQwsLmZ9_DZ_ioN-csa1vlsKaVaDrr9MXfvyMxaotX-_wb9cIjGXvwLRrMEE3FHOmWy9wgTHwi$> and type hints.
I have two problems:
1. the main blocker here is that it has recursive logic mentioned below.
2. Set<String> type hint doesn’t work well.
How could I define the TypeInformation for Set, it seems not supported in java type.
I provided something like this and then annotated thIS field in the POJO.
public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
@Override
public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
return TypeInformation.of(new TypeHint<Set<String>>() {});
}
}
But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set is treated as a generic type.
Thanks,
Fuyao
From: Weihua Hu <hu...@gmail.com>
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li <fu...@oracle.com>
Cc: user <us...@flink.apache.org>
Subject: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi, Fuyao
How did you define these classes? There is some requirements for POJO as flink docs[1] said:
* The class must be public.
* It must have a public constructor without arguments (default constructor).
* All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().
* The type of a field must be supported by a registered serializer.
[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>
Best,
Weihua
2022年5月12日 上午3:03,Fuyao Li <fu...@oracle.com>> 写道:
Hi Community,
I have a POJO that has nested recursively resolved structure. How should I define the @TypeInfo annotation correctly to avoid stack overflow exception when starting the application.
Basically,
Class Metadata
Map<String, FieldDefinition> fields
Class FieldDefinition
Metadata parentMetadata
The Metadata class got resolved recursively and causing stack overflow. I had to design this way since the metadata structure model looks like this.
Is there any ways to fix this issue? Or I must treat this as a Generic type and don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); statement, there won’t be any problem during program startup.
Thanks,
Fuyao