You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2020/12/14 14:27:52 UTC

How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Hi,

The following program compiles and runs w/o exceptions:

> public class Test {
>
>   public static class A {
>     private int n;
>
>     public A() { }
>     public int getN() {  return n;  }
>     public void setN(int n) {  this.n = n;  }
>   }
>
>   public static class B {
>     private List<A> lst;
>
>     public B() { }
>     public List<A> getLst() {  return lst;  }
>     public void setLst(List<A> lst) {  this.lst = lst;  }
>   }
>
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
>
>     env.fromElements(new B())
>       .print();
>
>     env.execute();
>   }
> }
>

When I add the following line,

> env.getConfig().disableGenericTypes();

then the program shows me an exception:

> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> java.util.List is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
> at Test.main(Test.java:29)


To avoid this exception, I found that I have to declare a type factory like:

>   public static class BTypeFactory extends TypeInfoFactory<B> {
>     @Override
>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
> TypeInformation<?>> genericParameters) {
>       return Types.POJO(
>         B.class,
>         ImmutableMap.<String, TypeInformation<?>>builder()
>           .put("lst", Types.LIST(Types.POJO(A.class)))
>         .build()
>       );
>     }
>   }

and give it to class B as follows:

>   @TypeInfo(BTypeFactory.class)
>   public static class B {


Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member
variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Arvid,

Okay, I'd better get back to Avro as you suggested!
Thanks for the tips regarding Avro.

Best,

Dongwon

On Wed, Jan 13, 2021 at 3:28 AM Arvid Heise <ar...@ververica.com> wrote:

> Do you think Scala is a better option in that regard?
>
> I'm not sure if scala is better in this regard. Sure you could use sealed
> classes but I don't know if the schema inference is using it. Maybe @Timo
> Walther <tw...@apache.org> knows more?
>
> I used to define Avro records in .avro files and generate Java classes to
>> use in my Flink application, but recently decided to move to POJOs for some
>> reasons (e.g. custom Java annotations, etc).
>>
> If it's just about annotations: You can use custom Java annotations with
> Avro generated classes by setting "javaAnnotations" property. [1] You can
> use it on records and fields.
> You can even provide your own velocity template [2] to add more features
> to the generation.
>
> [1]
> https://www.javacodegeeks.com/2020/07/it-is-never-enough-of-them-enriching-apache-avro-generated-classes-with-custom-java-annotations.html
> [2]
> https://github.com/apache/avro/blob/master/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
>
> On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <ea...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the very detailed explanation and tips!
>>
>> inferring the type information of Java classes is quite messy. At first,
>>> it seems like that should work out the box as you are only using <A> as the
>>> type of the list, right? However, there is no way of knowing if you didn't
>>> use a subclass of A. Of course, if A was final, it might be possible to
>>> exclude this case but you quickly go down a rabbit hole.
>>>
>> Do you think Scala is a better option in that regard?
>>
>> To avoid such limitations, I always recommend schema-first approaches
>>> when moving a PoC to production code. First figure out what kind of data
>>> you actually want to transfer. Then, settle for a serializer [2]. Then,
>>> create the schema and let the classes be generated (on-the-fly).
>>> I usually do it in two ways: if I write a rather generic program, I try
>>> to use Table API, which optimizes everything in a Row has one of the most
>>> memory efficient representations.
>>
>> I have to work with DataStream API as I need a custom trigger which is
>> not supported in Table API AFAIK.
>>
>> If Table API is not sufficient, I fall back to Avro and use Avro specific
>>> records [3].
>>
>> I used to define Avro records in .avro files and generate Java classes to
>> use in my Flink application, but recently decided to move to POJOs for some
>> reasons (e.g. custom Java annotations, etc).
>>
>> So it seems like I'd better satisfy with BTypeFactory in my original
>> question unless I'm willing to move to Scala or Avro, huh?
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> inferring the type information of Java classes is quite messy. At first,
>>> it seems like that should work out the box as you are only using <A> as the
>>> type of the list, right? However, there is no way of knowing if you didn't
>>> use a subclass of A. Of course, if A was final, it might be possible to
>>> exclude this case but you quickly go down a rabbit hole.
>>> It gets especially bad if you consider that your classes evolve over
>>> time. What happens if A is first final, but you later decide to subclass
>>> it? How should Flink map old data to the new hierarchy? Flink falls back to
>>> Kryo for most cases, which is why you need generic types. However, that is
>>> rather inefficient unless you register all possible classes beforehand [1].
>>>
>>> To avoid such limitations, I always recommend schema-first approaches
>>> when moving a PoC to production code. First figure out what kind of data
>>> you actually want to transfer. Then, settle for a serializer [2]. Then,
>>> create the schema and let the classes be generated (on-the-fly).
>>>
>>> I usually do it in two ways: if I write a rather generic program, I try
>>> to use Table API, which optimizes everything in a Row has one of the most
>>> memory efficient representations. If Table API is not sufficient, I fall
>>> back to Avro and use Avro specific records [3].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
>>> [2]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>> [3] https://avro.apache.org/docs/current/gettingstartedjava.html
>>>
>>>
>>> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Any advice would be appreciated :)
>>>>
>>>> Thanks,
>>>>
>>>> Dongwon
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Dongwon Kim <ea...@gmail.com>
>>>> Date: Mon, Dec 14, 2020 at 11:27 PM
>>>> Subject: How to gracefully avoid "Generic types have been disabled in
>>>> the ExecutionConfig and type java.util.List is treated as a generic type"?
>>>> To: user <us...@flink.apache.org>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> The following program compiles and runs w/o exceptions:
>>>>
>>>>> public class Test {
>>>>>
>>>>>   public static class A {
>>>>>     private int n;
>>>>>
>>>>>     public A() { }
>>>>>     public int getN() {  return n;  }
>>>>>     public void setN(int n) {  this.n = n;  }
>>>>>   }
>>>>>
>>>>>   public static class B {
>>>>>     private List<A> lst;
>>>>>
>>>>>     public B() { }
>>>>>     public List<A> getLst() {  return lst;  }
>>>>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>>>>   }
>>>>>
>>>>>   public static void main(String[] args) throws Exception {
>>>>>     StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>>
>>>>>     env.fromElements(new B())
>>>>>       .print();
>>>>>
>>>>>     env.execute();
>>>>>   }
>>>>> }
>>>>>
>>>>
>>>> When I add the following line,
>>>>
>>>>> env.getConfig().disableGenericTypes();
>>>>
>>>> then the program shows me an exception:
>>>>
>>>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>>>> Generic types have been disabled in the ExecutionConfig and type
>>>>> java.util.List is treated as a generic type.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>>>>> at Test.main(Test.java:29)
>>>>
>>>>
>>>> To avoid this exception, I found that I have to declare a type factory
>>>> like:
>>>>
>>>>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>>>>     @Override
>>>>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>>>>> TypeInformation<?>> genericParameters) {
>>>>>       return Types.POJO(
>>>>>         B.class,
>>>>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>>>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>>>>         .build()
>>>>>       );
>>>>>     }
>>>>>   }
>>>>
>>>> and give it to class B as follows:
>>>>
>>>>>   @TypeInfo(BTypeFactory.class)
>>>>>   public static class B {
>>>>
>>>>
>>>> Is there no other way but to declare BTypeFactory in such cases?
>>>> I don't like the way I have to type a field name twice, one for a
>>>> member variable and the other for an Map entry in TypeInfoFactory.
>>>>
>>>> Thanks in advance,
>>>>
>>>> Dongwon
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Posted by Arvid Heise <ar...@ververica.com>.
>
> Do you think Scala is a better option in that regard?

I'm not sure if scala is better in this regard. Sure you could use sealed
classes but I don't know if the schema inference is using it. Maybe @Timo
Walther <tw...@apache.org> knows more?

I used to define Avro records in .avro files and generate Java classes to
> use in my Flink application, but recently decided to move to POJOs for some
> reasons (e.g. custom Java annotations, etc).
>
If it's just about annotations: You can use custom Java annotations with
Avro generated classes by setting "javaAnnotations" property. [1] You can
use it on records and fields.
You can even provide your own velocity template [2] to add more features to
the generation.

[1]
https://www.javacodegeeks.com/2020/07/it-is-never-enough-of-them-enriching-apache-avro-generated-classes-with-custom-java-annotations.html
[2]
https://github.com/apache/avro/blob/master/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm

On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for the very detailed explanation and tips!
>
> inferring the type information of Java classes is quite messy. At first,
>> it seems like that should work out the box as you are only using <A> as the
>> type of the list, right? However, there is no way of knowing if you didn't
>> use a subclass of A. Of course, if A was final, it might be possible to
>> exclude this case but you quickly go down a rabbit hole.
>>
> Do you think Scala is a better option in that regard?
>
> To avoid such limitations, I always recommend schema-first approaches when
>> moving a PoC to production code. First figure out what kind of data you
>> actually want to transfer. Then, settle for a serializer [2]. Then, create
>> the schema and let the classes be generated (on-the-fly).
>> I usually do it in two ways: if I write a rather generic program, I try
>> to use Table API, which optimizes everything in a Row has one of the most
>> memory efficient representations.
>
> I have to work with DataStream API as I need a custom trigger which is not
> supported in Table API AFAIK.
>
> If Table API is not sufficient, I fall back to Avro and use Avro specific
>> records [3].
>
> I used to define Avro records in .avro files and generate Java classes to
> use in my Flink application, but recently decided to move to POJOs for some
> reasons (e.g. custom Java annotations, etc).
>
> So it seems like I'd better satisfy with BTypeFactory in my original
> question unless I'm willing to move to Scala or Avro, huh?
>
> Best,
>
> Dongwon
>
> On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Dongwon,
>>
>> inferring the type information of Java classes is quite messy. At first,
>> it seems like that should work out the box as you are only using <A> as the
>> type of the list, right? However, there is no way of knowing if you didn't
>> use a subclass of A. Of course, if A was final, it might be possible to
>> exclude this case but you quickly go down a rabbit hole.
>> It gets especially bad if you consider that your classes evolve over
>> time. What happens if A is first final, but you later decide to subclass
>> it? How should Flink map old data to the new hierarchy? Flink falls back to
>> Kryo for most cases, which is why you need generic types. However, that is
>> rather inefficient unless you register all possible classes beforehand [1].
>>
>> To avoid such limitations, I always recommend schema-first approaches
>> when moving a PoC to production code. First figure out what kind of data
>> you actually want to transfer. Then, settle for a serializer [2]. Then,
>> create the schema and let the classes be generated (on-the-fly).
>>
>> I usually do it in two ways: if I write a rather generic program, I try
>> to use Table API, which optimizes everything in a Row has one of the most
>> memory efficient representations. If Table API is not sufficient, I fall
>> back to Avro and use Avro specific records [3].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
>> [2]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> [3] https://avro.apache.org/docs/current/gettingstartedjava.html
>>
>>
>> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <ea...@gmail.com> wrote:
>>
>>> Any advice would be appreciated :)
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>> ---------- Forwarded message ---------
>>> From: Dongwon Kim <ea...@gmail.com>
>>> Date: Mon, Dec 14, 2020 at 11:27 PM
>>> Subject: How to gracefully avoid "Generic types have been disabled in
>>> the ExecutionConfig and type java.util.List is treated as a generic type"?
>>> To: user <us...@flink.apache.org>
>>>
>>>
>>> Hi,
>>>
>>> The following program compiles and runs w/o exceptions:
>>>
>>>> public class Test {
>>>>
>>>>   public static class A {
>>>>     private int n;
>>>>
>>>>     public A() { }
>>>>     public int getN() {  return n;  }
>>>>     public void setN(int n) {  this.n = n;  }
>>>>   }
>>>>
>>>>   public static class B {
>>>>     private List<A> lst;
>>>>
>>>>     public B() { }
>>>>     public List<A> getLst() {  return lst;  }
>>>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>>>   }
>>>>
>>>>   public static void main(String[] args) throws Exception {
>>>>     StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>
>>>>     env.fromElements(new B())
>>>>       .print();
>>>>
>>>>     env.execute();
>>>>   }
>>>> }
>>>>
>>>
>>> When I add the following line,
>>>
>>>> env.getConfig().disableGenericTypes();
>>>
>>> then the program shows me an exception:
>>>
>>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>>> Generic types have been disabled in the ExecutionConfig and type
>>>> java.util.List is treated as a generic type.
>>>> at
>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>>>> at
>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>>>> at
>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>>>> at Test.main(Test.java:29)
>>>
>>>
>>> To avoid this exception, I found that I have to declare a type factory
>>> like:
>>>
>>>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>>>     @Override
>>>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>>>> TypeInformation<?>> genericParameters) {
>>>>       return Types.POJO(
>>>>         B.class,
>>>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>>>         .build()
>>>>       );
>>>>     }
>>>>   }
>>>
>>> and give it to class B as follows:
>>>
>>>>   @TypeInfo(BTypeFactory.class)
>>>>   public static class B {
>>>
>>>
>>> Is there no other way but to declare BTypeFactory in such cases?
>>> I don't like the way I have to type a field name twice, one for a member
>>> variable and the other for an Map entry in TypeInfoFactory.
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Arvid,

Thanks for the very detailed explanation and tips!

inferring the type information of Java classes is quite messy. At first, it
> seems like that should work out the box as you are only using <A> as the
> type of the list, right? However, there is no way of knowing if you didn't
> use a subclass of A. Of course, if A was final, it might be possible to
> exclude this case but you quickly go down a rabbit hole.
>
Do you think Scala is a better option in that regard?

To avoid such limitations, I always recommend schema-first approaches when
> moving a PoC to production code. First figure out what kind of data you
> actually want to transfer. Then, settle for a serializer [2]. Then, create
> the schema and let the classes be generated (on-the-fly).
> I usually do it in two ways: if I write a rather generic program, I try to
> use Table API, which optimizes everything in a Row has one of the most
> memory efficient representations.

I have to work with DataStream API as I need a custom trigger which is not
supported in Table API AFAIK.

If Table API is not sufficient, I fall back to Avro and use Avro specific
> records [3].

I used to define Avro records in .avro files and generate Java classes to
use in my Flink application, but recently decided to move to POJOs for some
reasons (e.g. custom Java annotations, etc).

So it seems like I'd better satisfy with BTypeFactory in my original
question unless I'm willing to move to Scala or Avro, huh?

Best,

Dongwon

On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Dongwon,
>
> inferring the type information of Java classes is quite messy. At first,
> it seems like that should work out the box as you are only using <A> as the
> type of the list, right? However, there is no way of knowing if you didn't
> use a subclass of A. Of course, if A was final, it might be possible to
> exclude this case but you quickly go down a rabbit hole.
> It gets especially bad if you consider that your classes evolve over time.
> What happens if A is first final, but you later decide to subclass it? How
> should Flink map old data to the new hierarchy? Flink falls back to Kryo
> for most cases, which is why you need generic types. However, that is
> rather inefficient unless you register all possible classes beforehand [1].
>
> To avoid such limitations, I always recommend schema-first approaches when
> moving a PoC to production code. First figure out what kind of data you
> actually want to transfer. Then, settle for a serializer [2]. Then, create
> the schema and let the classes be generated (on-the-fly).
>
> I usually do it in two ways: if I write a rather generic program, I try to
> use Table API, which optimizes everything in a Row has one of the most
> memory efficient representations. If Table API is not sufficient, I fall
> back to Avro and use Avro specific records [3].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
> [2]
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> [3] https://avro.apache.org/docs/current/gettingstartedjava.html
>
>
> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Any advice would be appreciated :)
>>
>> Thanks,
>>
>> Dongwon
>>
>> ---------- Forwarded message ---------
>> From: Dongwon Kim <ea...@gmail.com>
>> Date: Mon, Dec 14, 2020 at 11:27 PM
>> Subject: How to gracefully avoid "Generic types have been disabled in the
>> ExecutionConfig and type java.util.List is treated as a generic type"?
>> To: user <us...@flink.apache.org>
>>
>>
>> Hi,
>>
>> The following program compiles and runs w/o exceptions:
>>
>>> public class Test {
>>>
>>>   public static class A {
>>>     private int n;
>>>
>>>     public A() { }
>>>     public int getN() {  return n;  }
>>>     public void setN(int n) {  this.n = n;  }
>>>   }
>>>
>>>   public static class B {
>>>     private List<A> lst;
>>>
>>>     public B() { }
>>>     public List<A> getLst() {  return lst;  }
>>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>>   }
>>>
>>>   public static void main(String[] args) throws Exception {
>>>     StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>
>>>     env.fromElements(new B())
>>>       .print();
>>>
>>>     env.execute();
>>>   }
>>> }
>>>
>>
>> When I add the following line,
>>
>>> env.getConfig().disableGenericTypes();
>>
>> then the program shows me an exception:
>>
>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>> Generic types have been disabled in the ExecutionConfig and type
>>> java.util.List is treated as a generic type.
>>> at
>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>>> at
>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>>> at
>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>>> at Test.main(Test.java:29)
>>
>>
>> To avoid this exception, I found that I have to declare a type factory
>> like:
>>
>>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>>     @Override
>>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>>> TypeInformation<?>> genericParameters) {
>>>       return Types.POJO(
>>>         B.class,
>>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>>         .build()
>>>       );
>>>     }
>>>   }
>>
>> and give it to class B as follows:
>>
>>>   @TypeInfo(BTypeFactory.class)
>>>   public static class B {
>>
>>
>> Is there no other way but to declare BTypeFactory in such cases?
>> I don't like the way I have to type a field name twice, one for a member
>> variable and the other for an Map entry in TypeInfoFactory.
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it
seems like that should work out the box as you are only using <A> as the
type of the list, right? However, there is no way of knowing if you didn't
use a subclass of A. Of course, if A was final, it might be possible to
exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time.
What happens if A is first final, but you later decide to subclass it? How
should Flink map old data to the new hierarchy? Flink falls back to Kryo
for most cases, which is why you need generic types. However, that is
rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when
moving a PoC to production code. First figure out what kind of data you
actually want to transfer. Then, settle for a serializer [2]. Then, create
the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to
use Table API, which optimizes everything in a Row has one of the most
memory efficient representations. If Table API is not sufficient, I fall
back to Avro and use Avro specific records [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[2]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
[3] https://avro.apache.org/docs/current/gettingstartedjava.html


On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <ea...@gmail.com> wrote:

> Any advice would be appreciated :)
>
> Thanks,
>
> Dongwon
>
> ---------- Forwarded message ---------
> From: Dongwon Kim <ea...@gmail.com>
> Date: Mon, Dec 14, 2020 at 11:27 PM
> Subject: How to gracefully avoid "Generic types have been disabled in the
> ExecutionConfig and type java.util.List is treated as a generic type"?
> To: user <us...@flink.apache.org>
>
>
> Hi,
>
> The following program compiles and runs w/o exceptions:
>
>> public class Test {
>>
>>   public static class A {
>>     private int n;
>>
>>     public A() { }
>>     public int getN() {  return n;  }
>>     public void setN(int n) {  this.n = n;  }
>>   }
>>
>>   public static class B {
>>     private List<A> lst;
>>
>>     public B() { }
>>     public List<A> getLst() {  return lst;  }
>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>>     StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>>
>>     env.fromElements(new B())
>>       .print();
>>
>>     env.execute();
>>   }
>> }
>>
>
> When I add the following line,
>
>> env.getConfig().disableGenericTypes();
>
> then the program shows me an exception:
>
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Generic types have been disabled in the ExecutionConfig and type
>> java.util.List is treated as a generic type.
>> at
>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>> at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>> at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>> at Test.main(Test.java:29)
>
>
> To avoid this exception, I found that I have to declare a type factory
> like:
>
>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>     @Override
>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>> TypeInformation<?>> genericParameters) {
>>       return Types.POJO(
>>         B.class,
>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>         .build()
>>       );
>>     }
>>   }
>
> and give it to class B as follows:
>
>>   @TypeInfo(BTypeFactory.class)
>>   public static class B {
>
>
> Is there no other way but to declare BTypeFactory in such cases?
> I don't like the way I have to type a field name twice, one for a member
> variable and the other for an Map entry in TypeInfoFactory.
>
> Thanks in advance,
>
> Dongwon
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Fwd: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

Posted by Dongwon Kim <ea...@gmail.com>.
Any advice would be appreciated :)

Thanks,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <ea...@gmail.com>
Date: Mon, Dec 14, 2020 at 11:27 PM
Subject: How to gracefully avoid "Generic types have been disabled in the
ExecutionConfig and type java.util.List is treated as a generic type"?
To: user <us...@flink.apache.org>


Hi,

The following program compiles and runs w/o exceptions:

> public class Test {
>
>   public static class A {
>     private int n;
>
>     public A() { }
>     public int getN() {  return n;  }
>     public void setN(int n) {  this.n = n;  }
>   }
>
>   public static class B {
>     private List<A> lst;
>
>     public B() { }
>     public List<A> getLst() {  return lst;  }
>     public void setLst(List<A> lst) {  this.lst = lst;  }
>   }
>
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment();
>
>     env.fromElements(new B())
>       .print();
>
>     env.execute();
>   }
> }
>

When I add the following line,

> env.getConfig().disableGenericTypes();

then the program shows me an exception:

> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and type
> java.util.List is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
> at Test.main(Test.java:29)


To avoid this exception, I found that I have to declare a type factory like:

>   public static class BTypeFactory extends TypeInfoFactory<B> {
>     @Override
>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
> TypeInformation<?>> genericParameters) {
>       return Types.POJO(
>         B.class,
>         ImmutableMap.<String, TypeInformation<?>>builder()
>           .put("lst", Types.LIST(Types.POJO(A.class)))
>         .build()
>       );
>     }
>   }

and give it to class B as follows:

>   @TypeInfo(BTypeFactory.class)
>   public static class B {


Is there no other way but to declare BTypeFactory in such cases?
I don't like the way I have to type a field name twice, one for a member
variable and the other for an Map entry in TypeInfoFactory.

Thanks in advance,

Dongwon