You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Laurent Exsteens <la...@euranova.eu> on 2020/03/28 13:01:59 UTC

State & Generics

Hello,

Using Flink 1.8.1, I'm getting the following error:
     *The TypeHint is using a generic variable.This is not supported,
generic types must be fully specified for the TypeHint.*
when trying to create a ListStateDescriptor with a generic type (full
sample code in attachment):

public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT,
RIGHT, Tuple2<LEFT, RIGHT>> {

    private transient ListState<LEFT> leftState;
    private transient ListState<RIGHT> rightState;

    @Override
    public void open(Configuration config) {
        ListStateDescriptor<LEFT> left_descriptor =
            new ListStateDescriptor<>(
                "and_left",
                TypeInformation.of(new TypeHint<LEFT>() {
                }));
        leftState = getRuntimeContext().getListState(left_descriptor);

This gives me the following stack trace:

















*Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed. at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at eu.euranova.leadcep.Main.main(Main.java:61)Caused by:
org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a
generic variable.This is not supported, generic types must be fully
specified for the TypeHint. at
org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:54) at
eu.euranova.leadcep.AND$1.<init>(AND.java:22) at
eu.euranova.leadcep.AND.open(AND.java:19) at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at
java.lang.Thread.run(Thread.java:745)*


Googling the error didn't give a working solution.

Is there a way to work with state using Generic types? if yes, how?

Thanks in advance for your help!

Best Regards,

Laurent.

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Re: State & Generics

Posted by Laurent Exsteens <la...@euranova.eu>.
Hi Aljoscha,

Thank you for your answer!

Out of curiosity, would writing my own serializer involve implementing a
serialisation for every your I could get?

On Wed, Apr 1, 2020, 13:57 Aljoscha Krettek <al...@apache.org> wrote:

> Hi Laurent!
>
> On 31.03.20 10:43, Laurent Exsteens wrote:
> > Yesterday I managed to find another solution: create the type information
> > outside of the class and pass it to the constructor. I can retrieve the
> > type information from DataStream.getType() (whiich. This works well, and
> is
> > acceptable in my case.
>
> This is a valid solution which is also used internally in some parts of
> Flink.
>
> > I'm starting to understand that the problem resides in Java Generics type
> > erasure: we cannot create the TypeInformation using a TypeHint inside the
> > Generic class, since creating the TypeHint is a runtime operation which
> > cannot access the generic type anymore, since it has been erased at
> compile
> > time.
> > Is my understanding correct?
>
> Yes, this seems correct.
>
> >
> > I'm still interested by a solution where everything could be done inside
> > the flatMap function, and without extra wrap and unwrap (although I'm
> > starting to think that is not possible due to type erasure).
>
> It would be possible if you write your completely custom TypeSerializer,
> but that is probably a lot more effort than it would be worth,
> considering you also found the alternative solution.
>
> Best,
> Aljoscha
>

-- 
♻ Be green, keep it on the screen

Re: State & Generics

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Laurent!

On 31.03.20 10:43, Laurent Exsteens wrote:
> Yesterday I managed to find another solution: create the type information
> outside of the class and pass it to the constructor. I can retrieve the
> type information from DataStream.getType() (whiich. This works well, and is
> acceptable in my case.

This is a valid solution which is also used internally in some parts of 
Flink.

> I'm starting to understand that the problem resides in Java Generics type
> erasure: we cannot create the TypeInformation using a TypeHint inside the
> Generic class, since creating the TypeHint is a runtime operation which
> cannot access the generic type anymore, since it has been erased at compile
> time.
> Is my understanding correct?

Yes, this seems correct.

> 
> I'm still interested by a solution where everything could be done inside
> the flatMap function, and without extra wrap and unwrap (although I'm
> starting to think that is not possible due to type erasure).

It would be possible if you write your completely custom TypeSerializer, 
but that is probably a lot more effort than it would be worth, 
considering you also found the alternative solution.

Best,
Aljoscha

Re: State & Generics

Posted by Laurent Exsteens <la...@euranova.eu>.
Hello Mike,

thanks for the info.
I tried to do sth similar in Java. Not there yet but I think that should be
feasible.

However, like you said, that means additional operations for each event.

Yesterday I managed to find another solution: create the type information
outside of the class and pass it to the constructor. I can retrieve the
type information from DataStream.getType() (whiich. This works well, and is
acceptable in my case.

I'm starting to understand that the problem resides in Java Generics type
erasure: we cannot create the TypeInformation using a TypeHint inside the
Generic class, since creating the TypeHint is a runtime operation which
cannot access the generic type anymore, since it has been erased at compile
time.
Is my understanding correct?

I'm still interested by a solution where everything could be done inside
the flatMap function, and without extra wrap and unwrap (although I'm
starting to think that is not possible due to type erasure).

Regards,

Laurent.




On Mon, 30 Mar 2020 at 04:50, Mike Mintz <mi...@gmail.com> wrote:

> I was able to get generic types to work when I used GenericTypeInfo and
> made sure to wrap the generic in some concrete type. In my case I used
> scala.Some as the wrapper. It looks something like this (in Scala):
>
> import org.apache.flink.api.java.typeutils.GenericTypeInfo
> val descriptor = new ListStateDescriptor[Some[T]]("blah", new
> GenericTypeInfo(classOf[Some[T]]))
>
> Since the descriptor is Some[T] instead of T, I had to wrap and unwrap it
> every time I used it.
>
> On Sat, Mar 28, 2020 at 6:02 AM Laurent Exsteens <
> laurent.exsteens@euranova.eu> wrote:
>
>> Hello,
>>
>> Using Flink 1.8.1, I'm getting the following error:
>>      *The TypeHint is using a generic variable.This is not supported,
>> generic types must be fully specified for the TypeHint.*
>> when trying to create a ListStateDescriptor with a generic type (full
>> sample code in attachment):
>>
>> public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT, RIGHT, Tuple2<LEFT, RIGHT>> {
>>
>>     private transient ListState<LEFT> leftState;
>>     private transient ListState<RIGHT> rightState;
>>
>>     @Override
>>     public void open(Configuration config) {
>>         ListStateDescriptor<LEFT> left_descriptor =
>>             new ListStateDescriptor<>(
>>                 "and_left",
>>                 TypeInformation.of(new TypeHint<LEFT>() {
>>                 }));
>>         leftState = getRuntimeContext().getListState(left_descriptor);
>>
>> This gives me the following stack trace:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>> failed. at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>> at eu.euranova.leadcep.Main.main(Main.java:61)Caused by:
>> org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a
>> generic variable.This is not supported, generic types must be fully
>> specified for the TypeHint. at
>> org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:54) at
>> eu.euranova.leadcep.AND$1.<init>(AND.java:22) at
>> eu.euranova.leadcep.AND.open(AND.java:19) at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at
>> java.lang.Thread.run(Thread.java:745)*
>>
>>
>> Googling the error didn't give a working solution.
>>
>> Is there a way to work with state using Generic types? if yes, how?
>>
>> Thanks in advance for your help!
>>
>> Best Regards,
>>
>> Laurent.
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>>
>> Rue Emile Francqui, 4
>>
>> 1435 Mont-Saint-Guibert
>>
>> (T) +32 10 75 02 00
>>
>> *euranova.eu <http://euranova.eu/>*
>>
>> *research.euranova.eu* <http://research.euranova.eu/>
>>
>> ♻ Be green, keep it on the screen
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Re: State & Generics

Posted by Mike Mintz <mi...@gmail.com>.
I was able to get generic types to work when I used GenericTypeInfo and
made sure to wrap the generic in some concrete type. In my case I used
scala.Some as the wrapper. It looks something like this (in Scala):

import org.apache.flink.api.java.typeutils.GenericTypeInfo
val descriptor = new ListStateDescriptor[Some[T]]("blah", new
GenericTypeInfo(classOf[Some[T]]))

Since the descriptor is Some[T] instead of T, I had to wrap and unwrap it
every time I used it.

On Sat, Mar 28, 2020 at 6:02 AM Laurent Exsteens <
laurent.exsteens@euranova.eu> wrote:

> Hello,
>
> Using Flink 1.8.1, I'm getting the following error:
>      *The TypeHint is using a generic variable.This is not supported,
> generic types must be fully specified for the TypeHint.*
> when trying to create a ListStateDescriptor with a generic type (full
> sample code in attachment):
>
> public class AND<LEFT, RIGHT> extends RichCoFlatMapFunction<LEFT, RIGHT, Tuple2<LEFT, RIGHT>> {
>
>     private transient ListState<LEFT> leftState;
>     private transient ListState<RIGHT> rightState;
>
>     @Override
>     public void open(Configuration config) {
>         ListStateDescriptor<LEFT> left_descriptor =
>             new ListStateDescriptor<>(
>                 "and_left",
>                 TypeInformation.of(new TypeHint<LEFT>() {
>                 }));
>         leftState = getRuntimeContext().getListState(left_descriptor);
>
> This gives me the following stack trace:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed. at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at eu.euranova.leadcep.Main.main(Main.java:61)Caused by:
> org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a
> generic variable.This is not supported, generic types must be fully
> specified for the TypeHint. at
> org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:54) at
> eu.euranova.leadcep.AND$1.<init>(AND.java:22) at
> eu.euranova.leadcep.AND.open(AND.java:19) at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at
> java.lang.Thread.run(Thread.java:745)*
>
>
> Googling the error didn't give a working solution.
>
> Is there a way to work with state using Generic types? if yes, how?
>
> Thanks in advance for your help!
>
> Best Regards,
>
> Laurent.
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu <http://euranova.eu/>*
>
> *research.euranova.eu* <http://research.euranova.eu/>
>
> ♻ Be green, keep it on the screen