You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martin Neumann <mn...@sics.se> on 2016/04/21 13:45:52 UTC
Help with generics
Hey,
I have a FlatMap that uses some generics (appended at the end of the mail).
I have some trouble with the type inference running into
InvalidTypesException on the first line in the open function.
How can I fix it?
Cheers Martin
public class AnomalyFlatMap<M extends Model,V extends ModelValue, T>
extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
private transient ValueState<M> microModel;
private final double threshold;
private boolean updateIfAnomaly;
private M initModel;
public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
this.threshold = threshold;
this.updateIfAnomaly = updateIfAnomaly;
this.initModel = model;
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<M> descriptor =
new ValueStateDescriptor<>(
"RollingMicroModel",
TypeInformation.of(new TypeHint<M>() {
}),initModel
);
microModel = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly,
T>> collector) throws Exception {
M model = microModel.value();
Anomaly res = model.calculateAnomaly(sample.f0);
if ( res.getScore() <= threshold || updateIfAnomaly){
model.addWindow(sample.f0);
microModel.update(model);
}
collector.collect(new Tuple2<>(res,sample.f1));
}
}
Re: Help with generics
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I'm sorry, I meant TypeInformation.of(initModel.getClass()).
On Thu, 21 Apr 2016 at 15:17 Martin Neumann <mn...@sics.se> wrote:
> Hej,
>
> I already tried TypeInformation.of(initModel.class) and it complained
> that initModel class is unknown. (Since it's of type M)
> I added a function to the model.class that returns the TypeInformation its
> working now though I still don't understand what happend behind the scenes
> and what I changed :-)
>
> cheers Martin
>
>
> On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> you're right there is not much (very little) in the documentation about
>> TypeInformation. There is only the description in the JavaDoc:
>> TypeInformation
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html> Essentially,
>> how it works is that we always use a TypeSerializer<T> when we have to
>> serialize values (for sending across network, storing in state, etc.). A
>> TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
>> can be obtained in several ways:
>>
>> - the TypeExtractor tries to analyze user functions to determine a
>> TypeInformation for the input and output type
>> - the TypeExtractor can try and analyze a given Class<T> to determine a
>> TypeInformation
>> - the Scala API uses macros and implicit parameters to create
>> TypeInformation
>> - TypeHint can be created to retrieve a TypeInformation
>> - a TypeInformation can be manually constructed
>>
>> tl;dr In your case you can try TypeInformation.of(initModel.class). If
>> that doesn't work you can try and pass in a function that gives you a
>> TypeInformation for your model type M.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mn...@sics.se> wrote:
>>
>>> Hej,
>>>
>>> I pass an instance of M in the constructor of the class, can I use that
>>> instead? Maybe give the class a function that returns the right
>>> TypeInformation? I'm trying figure out how TypeInformation works to better
>>> understand the Issue is there any documentation about this? At the moment I
>>> don't really understand what TypeInformation does and how it works.
>>>
>>> cheers Martin
>>>
>>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> I think it doesn't work because the concrete type of M is not available
>>>> to create a TypeInformation for M. What you can do is manually pass a
>>>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>>>> that when creating the state descriptor.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I have a FlatMap that uses some generics (appended at the end of the
>>>>> mail).
>>>>> I have some trouble with the type inference running into
>>>>> InvalidTypesException on the first line in the open function.
>>>>>
>>>>> How can I fix it?
>>>>>
>>>>> Cheers Martin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>>> private transient ValueState<M> microModel;
>>>>> private final double threshold;
>>>>> private boolean updateIfAnomaly;
>>>>> private M initModel;
>>>>>
>>>>> public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>>>> this.threshold = threshold;
>>>>> this.updateIfAnomaly = updateIfAnomaly;
>>>>> this.initModel = model;
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void open(Configuration parameters) throws Exception {
>>>>> ValueStateDescriptor<M> descriptor =
>>>>> new ValueStateDescriptor<>(
>>>>> "RollingMicroModel",
>>>>> TypeInformation.of(new TypeHint<M>() {
>>>>> }),initModel
>>>>> );
>>>>> microModel = getRuntimeContext().getState(descriptor);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>>>> M model = microModel.value();
>>>>> Anomaly res = model.calculateAnomaly(sample.f0);
>>>>>
>>>>> if ( res.getScore() <= threshold || updateIfAnomaly){
>>>>> model.addWindow(sample.f0);
>>>>> microModel.update(model);
>>>>> }
>>>>> collector.collect(new Tuple2<>(res,sample.f1));
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>
>
Re: Help with generics
Posted by Martin Neumann <mn...@sics.se>.
Hej,
I already tried TypeInformation.of(initModel.class) and it complained that
initModel class is unknown. (Since it's of type M)
I added a function to the model.class that returns the TypeInformation its
working now though I still don't understand what happend behind the scenes
and what I changed :-)
cheers Martin
On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek <al...@apache.org>
wrote:
> Hi,
> you're right there is not much (very little) in the documentation about
> TypeInformation. There is only the description in the JavaDoc:
> TypeInformation
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html> Essentially,
> how it works is that we always use a TypeSerializer<T> when we have to
> serialize values (for sending across network, storing in state, etc.). A
> TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
> can be obtained in several ways:
>
> - the TypeExtractor tries to analyze user functions to determine a
> TypeInformation for the input and output type
> - the TypeExtractor can try and analyze a given Class<T> to determine a
> TypeInformation
> - the Scala API uses macros and implicit parameters to create
> TypeInformation
> - TypeHint can be created to retrieve a TypeInformation
> - a TypeInformation can be manually constructed
>
> tl;dr In your case you can try TypeInformation.of(initModel.class). If
> that doesn't work you can try and pass in a function that gives you a
> TypeInformation for your model type M.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mn...@sics.se> wrote:
>
>> Hej,
>>
>> I pass an instance of M in the constructor of the class, can I use that
>> instead? Maybe give the class a function that returns the right
>> TypeInformation? I'm trying figure out how TypeInformation works to better
>> understand the Issue is there any documentation about this? At the moment I
>> don't really understand what TypeInformation does and how it works.
>>
>> cheers Martin
>>
>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I think it doesn't work because the concrete type of M is not available
>>> to create a TypeInformation for M. What you can do is manually pass a
>>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>>> that when creating the state descriptor.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>>>
>>>> Hey,
>>>>
>>>> I have a FlatMap that uses some generics (appended at the end of the
>>>> mail).
>>>> I have some trouble with the type inference running into
>>>> InvalidTypesException on the first line in the open function.
>>>>
>>>> How can I fix it?
>>>>
>>>> Cheers Martin
>>>>
>>>>
>>>>
>>>>
>>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>> private transient ValueState<M> microModel;
>>>> private final double threshold;
>>>> private boolean updateIfAnomaly;
>>>> private M initModel;
>>>>
>>>> public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>>> this.threshold = threshold;
>>>> this.updateIfAnomaly = updateIfAnomaly;
>>>> this.initModel = model;
>>>>
>>>> }
>>>>
>>>> @Override
>>>> public void open(Configuration parameters) throws Exception {
>>>> ValueStateDescriptor<M> descriptor =
>>>> new ValueStateDescriptor<>(
>>>> "RollingMicroModel",
>>>> TypeInformation.of(new TypeHint<M>() {
>>>> }),initModel
>>>> );
>>>> microModel = getRuntimeContext().getState(descriptor);
>>>> }
>>>>
>>>> @Override
>>>> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>>> M model = microModel.value();
>>>> Anomaly res = model.calculateAnomaly(sample.f0);
>>>>
>>>> if ( res.getScore() <= threshold || updateIfAnomaly){
>>>> model.addWindow(sample.f0);
>>>> microModel.update(model);
>>>> }
>>>> collector.collect(new Tuple2<>(res,sample.f1));
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>
Re: Help with generics
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you're right there is not much (very little) in the documentation about
TypeInformation. There is only the description in the JavaDoc:
TypeInformation
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html>
Essentially,
how it works is that we always use a TypeSerializer<T> when we have to
serialize values (for sending across network, storing in state, etc.). A
TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
can be obtained in several ways:
- the TypeExtractor tries to analyze user functions to determine a
TypeInformation for the input and output type
- the TypeExtractor can try and analyze a given Class<T> to determine a
TypeInformation
- the Scala API uses macros and implicit parameters to create
TypeInformation
- TypeHint can be created to retrieve a TypeInformation
- a TypeInformation can be manually constructed
tl;dr In your case you can try TypeInformation.of(initModel.class). If that
doesn't work you can try and pass in a function that gives you a
TypeInformation for your model type M.
Cheers,
Aljoscha
On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mn...@sics.se> wrote:
> Hej,
>
> I pass an instance of M in the constructor of the class, can I use that
> instead? Maybe give the class a function that returns the right
> TypeInformation? I'm trying figure out how TypeInformation works to better
> understand the Issue is there any documentation about this? At the moment I
> don't really understand what TypeInformation does and how it works.
>
> cheers Martin
>
> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> I think it doesn't work because the concrete type of M is not available
>> to create a TypeInformation for M. What you can do is manually pass a
>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>> that when creating the state descriptor.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>>
>>> Hey,
>>>
>>> I have a FlatMap that uses some generics (appended at the end of the
>>> mail).
>>> I have some trouble with the type inference running into
>>> InvalidTypesException on the first line in the open function.
>>>
>>> How can I fix it?
>>>
>>> Cheers Martin
>>>
>>>
>>>
>>>
>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>> private transient ValueState<M> microModel;
>>> private final double threshold;
>>> private boolean updateIfAnomaly;
>>> private M initModel;
>>>
>>> public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>> this.threshold = threshold;
>>> this.updateIfAnomaly = updateIfAnomaly;
>>> this.initModel = model;
>>>
>>> }
>>>
>>> @Override
>>> public void open(Configuration parameters) throws Exception {
>>> ValueStateDescriptor<M> descriptor =
>>> new ValueStateDescriptor<>(
>>> "RollingMicroModel",
>>> TypeInformation.of(new TypeHint<M>() {
>>> }),initModel
>>> );
>>> microModel = getRuntimeContext().getState(descriptor);
>>> }
>>>
>>> @Override
>>> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>>> M model = microModel.value();
>>> Anomaly res = model.calculateAnomaly(sample.f0);
>>>
>>> if ( res.getScore() <= threshold || updateIfAnomaly){
>>> model.addWindow(sample.f0);
>>> microModel.update(model);
>>> }
>>> collector.collect(new Tuple2<>(res,sample.f1));
>>> }
>>> }
>>>
>>>
>>>
>
Re: Help with generics
Posted by Martin Neumann <mn...@sics.se>.
Hej,
I pass an instance of M in the constructor of the class, can I use that
instead? Maybe give the class a function that returns the right
TypeInformation? I'm trying figure out how TypeInformation works to better
understand the Issue is there any documentation about this? At the moment I
don't really understand what TypeInformation does and how it works.
cheers Martin
On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <al...@apache.org>
wrote:
> Hi,
> I think it doesn't work because the concrete type of M is not available to
> create a TypeInformation for M. What you can do is manually pass a
> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
> that when creating the state descriptor.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
>
>> Hey,
>>
>> I have a FlatMap that uses some generics (appended at the end of the
>> mail).
>> I have some trouble with the type inference running into
>> InvalidTypesException on the first line in the open function.
>>
>> How can I fix it?
>>
>> Cheers Martin
>>
>>
>>
>>
>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>> private transient ValueState<M> microModel;
>> private final double threshold;
>> private boolean updateIfAnomaly;
>> private M initModel;
>>
>> public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>> this.threshold = threshold;
>> this.updateIfAnomaly = updateIfAnomaly;
>> this.initModel = model;
>>
>> }
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> ValueStateDescriptor<M> descriptor =
>> new ValueStateDescriptor<>(
>> "RollingMicroModel",
>> TypeInformation.of(new TypeHint<M>() {
>> }),initModel
>> );
>> microModel = getRuntimeContext().getState(descriptor);
>> }
>>
>> @Override
>> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
>> M model = microModel.value();
>> Anomaly res = model.calculateAnomaly(sample.f0);
>>
>> if ( res.getScore() <= threshold || updateIfAnomaly){
>> model.addWindow(sample.f0);
>> microModel.update(model);
>> }
>> collector.collect(new Tuple2<>(res,sample.f1));
>> }
>> }
>>
>>
>>
Re: Help with generics
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think it doesn't work because the concrete type of M is not available to
create a TypeInformation for M. What you can do is manually pass a
TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
that when creating the state descriptor.
Cheers,
Aljoscha
On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mn...@sics.se> wrote:
> Hey,
>
> I have a FlatMap that uses some generics (appended at the end of the mail).
> I have some trouble with the type inference running into
> InvalidTypesException on the first line in the open function.
>
> How can I fix it?
>
> Cheers Martin
>
>
>
>
> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
> private transient ValueState<M> microModel;
> private final double threshold;
> private boolean updateIfAnomaly;
> private M initModel;
>
> public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
> this.threshold = threshold;
> this.updateIfAnomaly = updateIfAnomaly;
> this.initModel = model;
>
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
> ValueStateDescriptor<M> descriptor =
> new ValueStateDescriptor<>(
> "RollingMicroModel",
> TypeInformation.of(new TypeHint<M>() {
> }),initModel
> );
> microModel = getRuntimeContext().getState(descriptor);
> }
>
> @Override
> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception {
> M model = microModel.value();
> Anomaly res = model.calculateAnomaly(sample.f0);
>
> if ( res.getScore() <= threshold || updateIfAnomaly){
> model.addWindow(sample.f0);
> microModel.update(model);
> }
> collector.collect(new Tuple2<>(res,sample.f1));
> }
> }
>
>
>