You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martin Neumann <mn...@sics.se> on 2016/04/21 13:45:52 UTC

Help with generics

Hey,

I have a FlatMap that uses some generics (appended at the end of the mail).
I have some trouble with the type inference running into
InvalidTypesException on the first line in the open function.

How can I fix it?

Cheers Martin




public class AnomalyFlatMap<M extends Model,V extends ModelValue, T>
extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
    private transient ValueState<M> microModel;
    private final double threshold;
    private boolean updateIfAnomaly;
    private M initModel;

    public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
        this.threshold = threshold;
        this.updateIfAnomaly = updateIfAnomaly;
        this.initModel = model;

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<M> descriptor =
                new ValueStateDescriptor<>(
                        "RollingMicroModel",
                        TypeInformation.of(new TypeHint<M>() {
                        }),initModel
                        );
        microModel = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly,
T>> collector) throws Exception {
        M model = microModel.value();
        Anomaly res  = model.calculateAnomaly(sample.f0);

        if ( res.getScore() <= threshold || updateIfAnomaly){
            model.addWindow(sample.f0);
            microModel.update(model);
        }
        collector.collect(new Tuple2<>(res,sample.f1));
    }
}

Re: Help with generics

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm sorry, I meant TypeInformation.of(initModel.getClass()).

On Thu, 21 Apr 2016 at 15:17 Martin Neumann <mn...@sics.se> wrote:

> Hej,
>
> I already tried TypeInformation.of(initModel.class) and it complained
> that initModel class is unknown. (Since it's of type M)
> I added a function to the model.class that returns the TypeInformation its
> working now though I still don't understand what happend behind the scenes
> and what I changed :-)
>
> cheers Martin
>
>
> On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> you're right there is not much (very little) in the documentation about
>> TypeInformation. There is only the description in the JavaDoc:
>> TypeInformation
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html> Essentially,
>> how it works is that we always use a TypeSerializer<T> when we have to
>> serialize values (for sending across network, storing in state, etc.). A
>> TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
>> can be obtained in several ways:
>>
>>  - the TypeExtractor tries to analyze user functions to determine a
>> TypeInformation for the input and output type
>>  - the TypeExtractor can try and analyze a given Class<T> to determine a
>> TypeInformation
>>  - the Scala API uses macros and implicit parameters to create
>> TypeInformation
>>  - TypeHint can be created to retrieve a TypeInformation
>>  - a TypeInformation can be manually constructed
>>
>> tl;dr In your case you can try TypeInformation.of(initModel.class). If
>> that doesn't work you can try and pass in a function that gives you a
>> TypeInformation for your model type M.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mn...@sics.se> wrote:
>>
>>> Hej,
>>>
>>> I pass an instance of M in the constructor of the class, can I use that
>>> instead? Maybe give the class a function that returns the right
>>> TypeInformation? I'm trying figure out how TypeInformation works to better
>>> understand the Issue is there any documentation about this? At the moment I
>>> don't really understand what TypeInformation does and how it works.
>>>
>>> cheers Martin
>>>
>>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> I think it doesn't work because the concrete type of M is not available
>>>> to create a TypeInformation for M. What you can do is manually pass a
>>>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>>>> that when creating the state descriptor.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I have a FlatMap that uses some generics (appended at the end of the
>>>>> mail).
>>>>> I have some trouble with the type inference running into
>>>>> InvalidTypesException on the first line in the open function.
>>>>>
>>>>> How can I fix it?
>>>>>
>>>>> Cheers Martin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>>>     private transient ValueState<M> microModel;
>>>>>     private final double threshold;
>>>>>     private boolean updateIfAnomaly;
>>>>>     private M initModel;
>>>>>
>>>>>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>>>>         this.threshold = threshold;
>>>>>         this.updateIfAnomaly = updateIfAnomaly;
>>>>>         this.initModel = model;
>>>>>
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>         ValueStateDescriptor<M> descriptor =
>>>>>                 new ValueStateDescriptor<>(
>>>>>                         "RollingMicroModel",
>>>>>                         TypeInformation.of(new TypeHint<M>() {
>>>>>                         }),initModel
>>>>>                         );
>>>>>         microModel = getRuntimeContext().getState(descriptor);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>>>>         M model = microModel.value();
>>>>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>>>>
>>>>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>>>>             model.addWindow(sample.f0);
>>>>>             microModel.update(model);
>>>>>         }
>>>>>         collector.collect(new Tuple2<>(res,sample.f1));
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>
>

Re: Help with generics

Posted by Martin Neumann <mn...@sics.se>.
Hej,

I already tried TypeInformation.of(initModel.class) and it complained that
initModel class is unknown. (Since it's of type M)
I added a function to the model.class that returns the TypeInformation its
working now though I still don't understand what happend behind the scenes
and what I changed :-)

cheers Martin


On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> you're right there is not much (very little) in the documentation about
> TypeInformation. There is only the description in the JavaDoc:
> TypeInformation
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html> Essentially,
> how it works is that we always use a TypeSerializer<T> when we have to
> serialize values (for sending across network, storing in state, etc.). A
> TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
> can be obtained in several ways:
>
>  - the TypeExtractor tries to analyze user functions to determine a
> TypeInformation for the input and output type
>  - the TypeExtractor can try and analyze a given Class<T> to determine a
> TypeInformation
>  - the Scala API uses macros and implicit parameters to create
> TypeInformation
>  - TypeHint can be created to retrieve a TypeInformation
>  - a TypeInformation can be manually constructed
>
> tl;dr In your case you can try TypeInformation.of(initModel.class). If
> that doesn't work you can try and pass in a function that gives you a
> TypeInformation for your model type M.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mn...@sics.se> wrote:
>
>> Hej,
>>
>> I pass an instance of M in the constructor of the class, can I use that
>> instead? Maybe give the class a function that returns the right
>> TypeInformation? I'm trying figure out how TypeInformation works to better
>> understand the Issue is there any documentation about this? At the moment I
>> don't really understand what TypeInformation does and how it works.
>>
>> cheers Martin
>>
>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I think it doesn't work because the concrete type of M is not available
>>> to create a TypeInformation for M. What you can do is manually pass a
>>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>>> that when creating the state descriptor.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>>>
>>>> Hey,
>>>>
>>>> I have a FlatMap that uses some generics (appended at the end of the
>>>> mail).
>>>> I have some trouble with the type inference running into
>>>> InvalidTypesException on the first line in the open function.
>>>>
>>>> How can I fix it?
>>>>
>>>> Cheers Martin
>>>>
>>>>
>>>>
>>>>
>>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>>     private transient ValueState<M> microModel;
>>>>     private final double threshold;
>>>>     private boolean updateIfAnomaly;
>>>>     private M initModel;
>>>>
>>>>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>>>         this.threshold = threshold;
>>>>         this.updateIfAnomaly = updateIfAnomaly;
>>>>         this.initModel = model;
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     public void open(Configuration parameters) throws Exception {
>>>>         ValueStateDescriptor<M> descriptor =
>>>>                 new ValueStateDescriptor<>(
>>>>                         "RollingMicroModel",
>>>>                         TypeInformation.of(new TypeHint<M>() {
>>>>                         }),initModel
>>>>                         );
>>>>         microModel = getRuntimeContext().getState(descriptor);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>>>         M model = microModel.value();
>>>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>>>
>>>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>>>             model.addWindow(sample.f0);
>>>>             microModel.update(model);
>>>>         }
>>>>         collector.collect(new Tuple2<>(res,sample.f1));
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>

Re: Help with generics

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you're right there is not much (very little) in the documentation about
TypeInformation. There is only the description in the JavaDoc:
TypeInformation
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html>
Essentially,
how it works is that we always use a TypeSerializer<T> when we have to
serialize values (for sending across network, storing in state, etc.). A
TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
can be obtained in several ways:

 - the TypeExtractor tries to analyze user functions to determine a
TypeInformation for the input and output type
 - the TypeExtractor can try and analyze a given Class<T> to determine a
TypeInformation
 - the Scala API uses macros and implicit parameters to create
TypeInformation
 - TypeHint can be created to retrieve a TypeInformation
 - a TypeInformation can be manually constructed

tl;dr In your case you can try TypeInformation.of(initModel.class). If that
doesn't work you can try and pass in a function that gives you a
TypeInformation for your model type M.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mn...@sics.se> wrote:

> Hej,
>
> I pass an instance of M in the constructor of the class, can I use that
> instead? Maybe give the class a function that returns the right
> TypeInformation? I'm trying figure out how TypeInformation works to better
> understand the Issue is there any documentation about this? At the moment I
> don't really understand what TypeInformation does and how it works.
>
> cheers Martin
>
> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> I think it doesn't work because the concrete type of M is not available
>> to create a TypeInformation for M. What you can do is manually pass a
>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>> that when creating the state descriptor.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>>
>>> Hey,
>>>
>>> I have a FlatMap that uses some generics (appended at the end of the
>>> mail).
>>> I have some trouble with the type inference running into
>>> InvalidTypesException on the first line in the open function.
>>>
>>> How can I fix it?
>>>
>>> Cheers Martin
>>>
>>>
>>>
>>>
>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>     private transient ValueState<M> microModel;
>>>     private final double threshold;
>>>     private boolean updateIfAnomaly;
>>>     private M initModel;
>>>
>>>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>>         this.threshold = threshold;
>>>         this.updateIfAnomaly = updateIfAnomaly;
>>>         this.initModel = model;
>>>
>>>     }
>>>
>>>     @Override
>>>     public void open(Configuration parameters) throws Exception {
>>>         ValueStateDescriptor<M> descriptor =
>>>                 new ValueStateDescriptor<>(
>>>                         "RollingMicroModel",
>>>                         TypeInformation.of(new TypeHint<M>() {
>>>                         }),initModel
>>>                         );
>>>         microModel = getRuntimeContext().getState(descriptor);
>>>     }
>>>
>>>     @Override
>>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>>         M model = microModel.value();
>>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>>
>>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>>             model.addWindow(sample.f0);
>>>             microModel.update(model);
>>>         }
>>>         collector.collect(new Tuple2<>(res,sample.f1));
>>>     }
>>> }
>>>
>>>
>>>
>

Re: Help with generics

Posted by Martin Neumann <mn...@sics.se>.
Hej,

I pass an instance of M in the constructor of the class, can I use that
instead? Maybe give the class a function that returns the right
TypeInformation? I'm trying figure out how TypeInformation works to better
understand the Issue is there any documentation about this? At the moment I
don't really understand what TypeInformation does and how it works.

cheers Martin

On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I think it doesn't work because the concrete type of M is not available to
> create a TypeInformation for M. What you can do is manually pass a
> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
> that when creating the state descriptor.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>
>> Hey,
>>
>> I have a FlatMap that uses some generics (appended at the end of the
>> mail).
>> I have some trouble with the type inference running into
>> InvalidTypesException on the first line in the open function.
>>
>> How can I fix it?
>>
>> Cheers Martin
>>
>>
>>
>>
>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>     private transient ValueState<M> microModel;
>>     private final double threshold;
>>     private boolean updateIfAnomaly;
>>     private M initModel;
>>
>>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>         this.threshold = threshold;
>>         this.updateIfAnomaly = updateIfAnomaly;
>>         this.initModel = model;
>>
>>     }
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>         ValueStateDescriptor<M> descriptor =
>>                 new ValueStateDescriptor<>(
>>                         "RollingMicroModel",
>>                         TypeInformation.of(new TypeHint<M>() {
>>                         }),initModel
>>                         );
>>         microModel = getRuntimeContext().getState(descriptor);
>>     }
>>
>>     @Override
>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>         M model = microModel.value();
>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>
>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>             model.addWindow(sample.f0);
>>             microModel.update(model);
>>         }
>>         collector.collect(new Tuple2<>(res,sample.f1));
>>     }
>> }
>>
>>
>>

Re: Help with generics

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think it doesn't work because the concrete type of M is not available to
create a TypeInformation for M. What you can do is manually pass a
TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
that when creating the state descriptor.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:

> Hey,
>
> I have a FlatMap that uses some generics (appended at the end of the mail).
> I have some trouble with the type inference running into
> InvalidTypesException on the first line in the open function.
>
> How can I fix it?
>
> Cheers Martin
>
>
>
>
> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>     private transient ValueState<M> microModel;
>     private final double threshold;
>     private boolean updateIfAnomaly;
>     private M initModel;
>
>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>         this.threshold = threshold;
>         this.updateIfAnomaly = updateIfAnomaly;
>         this.initModel = model;
>
>     }
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         ValueStateDescriptor<M> descriptor =
>                 new ValueStateDescriptor<>(
>                         "RollingMicroModel",
>                         TypeInformation.of(new TypeHint<M>() {
>                         }),initModel
>                         );
>         microModel = getRuntimeContext().getState(descriptor);
>     }
>
>     @Override
>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>         M model = microModel.value();
>         Anomaly res  = model.calculateAnomaly(sample.f0);
>
>         if ( res.getScore() <= threshold || updateIfAnomaly){
>             model.addWindow(sample.f0);
>             microModel.update(model);
>         }
>         collector.collect(new Tuple2<>(res,sample.f1));
>     }
> }
>
>
>