You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fuyao Li <fu...@oracle.com> on 2022/05/12 16:45:58 UTC

Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO

Hi Weihua,

I am following all the standards mentioned here. The code structure is listed in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map<String, FieldDefinition> fields;
    @TypeInfo(StringSetTypeInfoFactory.class)
    private Set<String> validColumns = new HashSet<>();
}

Class FieldDefinition {
   Metadata parentMetadata; // causing recusive resolving when type info is added
}

public class StringFieldDefinitionMapTypeInfoFactory extends TypeInfoFactory<Map<String, FieldDefinition>> {
    @Override
    public TypeInformation<Map<String, FieldDefinition>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return new MapTypeInfo<String, FieldDefinition>(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
    }
}

public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
    @Override
    public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return TypeInformation.of(new TypeHint<Set<String>>() {});
    }
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, Map<>, Set<>, due to the type erase behavior. I have to provide the type information through type factory<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory> and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set<String> type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
    @Override
    public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return TypeInformation.of(new TypeHint<Set<String>>() {});
    }
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu <hu...@gmail.com>
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li <fu...@oracle.com>
Cc: user <us...@flink.apache.org>
Subject: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>



Best,
Weihua


2022年5月12日 上午3:03,Fuyao Li <fu...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I define the @TypeInfo annotation correctly to avoid stack overflow exception when starting the application.

Basically,
Class Metadata
Map<String, FieldDefinition> fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); statement, there won’t be any problem during program startup.

Thanks,
Fuyao


Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO

Posted by Fuyao Li <fu...@oracle.com>.
Updated the FieldDefinition class inline to avoid confusion. I am just listing a few fields in the class (not all). It is all following suggested POJO approach.

From: Fuyao Li <fu...@oracle.com>
Date: Thursday, May 12, 2022 at 09:46
To: Weihua Hu <hu...@gmail.com>
Cc: User <us...@flink.apache.org>
Subject: Re: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi Weihua,

I am following all the standards mentioned here. The code structure is listed in the previous email.

@Data
Class Metadata {
   @TypeInfo(StringFieldDefinitionMapTypeInfoFactory.class)
Map<String, FieldDefinition> fields;
    @TypeInfo(StringSetTypeInfoFactory.class)
    private Set<String> validColumns = new HashSet<>();
}

@Data
Class FieldDefinition {
   private Metadata parentMetadata; // causing recusive resolving when type info is added
}

public class StringFieldDefinitionMapTypeInfoFactory extends TypeInfoFactory<Map<String, FieldDefinition>> {
    @Override
    public TypeInformation<Map<String, FieldDefinition>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return new MapTypeInfo<String, FieldDefinition>(TypeInformation.of(String.class),TypeInformation.of(FieldDefinition.class));
    }
}

public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
    @Override
    public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return TypeInformation.of(new TypeHint<Set<String>>() {});
    }
}



I am using @Data in @Lombok to expose the getters and setters. For List<>, Map<>, Set<>, due to the type erase behavior. I have to provide the type information through type factory<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*defining-type-information-using-a-factory__;Iw!!ACWV5N9M2RV99hQ!LCjf2FOQwsLmZ9_DZ_ioN-csa1vlsKaVaDrr9MXfvyMxaotX-_wb9cIjGXvwLRrMEE3FHOmWy9wgTHwi$> and type hints.

I have two problems:

  1.  the main blocker here is that it has recursive logic mentioned below.
  2.  Set<String> type hint doesn’t work well.

How could I define the TypeInformation for Set, it seems not supported in java type.

I provided something like this and then annotated thIS field in the POJO.

public class StringSetTypeInfoFactory extends TypeInfoFactory<Set<String>> {
    @Override
    public TypeInformation<Set<String>> createTypeInfo(Type type, Map<String, TypeInformation<?>> map) {
        return TypeInformation.of(new TypeHint<Set<String>>() {});
    }
}

But I still get the following errors.
Generic types have been disabled in the ExecutionConfig and type java.util.Set is treated as a generic type.


Thanks,
Fuyao


From: Weihua Hu <hu...@gmail.com>
Date: Thursday, May 12, 2022 at 07:24
To: Fuyao Li <fu...@oracle.com>
Cc: user <us...@flink.apache.org>
Subject: [External] : Re: How to define TypeInformation for Flink recursive resolved POJO
Hi, Fuyao

How did you define these classes?  There is some requirements for POJO as flink docs[1] said:

  *   The class must be public.
  *   It must have a public constructor without arguments (default constructor).
  *   All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().
  *   The type of a field must be supported by a registered serializer.

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/*pojos__;Iw!!ACWV5N9M2RV99hQ!PdVwjLMJx2-0lTqUPEqHsfgBJE4bUJE2NCqYoBQfpxWMkIIk3mmVJWPhqgeHbj7ramdK5s05pknkWT94hjPBiA$>



Best,
Weihua



2022年5月12日 上午3:03,Fuyao Li <fu...@oracle.com>> 写道:

Hi Community,

I have a POJO that has nested recursively resolved structure. How should I define the @TypeInfo annotation correctly to avoid stack overflow exception when starting the application.

Basically,
Class Metadata
Map<String, FieldDefinition> fields

Class FieldDefinition
Metadata parentMetadata

The Metadata class got resolved recursively and causing stack overflow. I had to design this way since the metadata structure model looks like this.

Is there any ways to fix this issue? Or I must treat this as a Generic type and don’t add @TypeInfo annotation so it can fallback to Kryo. If it fall back to Kryo and remove the streamEnvironment.getConfig().disableGenericTypes(); statement, there won’t be any problem during program startup.

Thanks,
Fuyao