You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2016/02/09 18:03:29 UTC

Dataset filter improvement

Hi to all,

in my program I have a Dataset that generated different types of object wrt
the incoming element.
Thus it's like a Map<Tuple2,Object>.
In order to type the different generated datasets I do something:

Dataset<Tuple2> start =...

Dataset<MyObj1> ds1 = start.filter().map(..);
Dataset<MyObj1> ds2 = start.filter().map(..);
Dataset<MyObj3> ds3 = start.filter().map(..);
Dataset<MyObj3> ds4 = start.filter().map(..);

However this is very inefficient (I think because Flink needs to
materialize the entire source dataset for every slot).

It's much more efficient to group the generation of objects of the same
type. E.g.:

Dataset<Tuple2> start =..

Dataset<MyObj1> tmp1 = start.map(..);
Dataset<MyObj3> tmp2 = start.map(..);
Dataset<MyObj1> ds1 = tmp1.filter();
Dataset<MyObj1> ds2 = tmp1.filter();
Dataset<MyObj3> ds3 = tmp2.filter();
Dataset<MyObj3> ds4 = tmp2.filter();

Increasing the number of slots per task manager make things worse and worse
:)
Is there a way to improve this situation? Is it possible to write a "map"
generating different type of object and then filter them by generated class
type?

Best,
Flavio

Re: Dataset filter improvement

Posted by Till Rohrmann <tr...@apache.org>.
Hi Flavio,

it works the following way: Your data type will serialized by the
PojoSerializer iff it is a POJO. Iff it is a generic type which cannot be
serialized by any of the other serializers, then Kryo is used.

If it is a POJO type and you’re having DataStream which can also contain
subtypes of this type, then registering these subtypes will avoid writing
their complete class name. Instead a shorter ID will be written.

If it is a generic type and your data type is not automatically registered
(e.g. because your data stream also contains sub types), then registering
these types will have the same effect. Instead of writing the complete
class name every time your serialize your data, you will only write a much
shorter ID.

Cheers,
Till
​

On Wed, Feb 24, 2016 at 9:39 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Thanks Max and Till for the answers. However I still didn't understand
> fully the difference...Here are my doubts:
>
>    - If I don't register any of my POJO classes, they will be serialized
>    with Kryo (black box for Flink)
>    - If I register all of my POJO using env.registerType they will be
>    serialized as POJO (which is slower than Tuple serialization but much
>    faster than Kryo)
>    - What if I call env.registerTypeWithKryoSerializer()? Why should I
>    specify a serializer for Kryo?
>
> Best,
> Flavio
>
>
> On Tue, Feb 23, 2016 at 4:08 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Registering a data type is only relevant for the Kryo serializer or if
>> you want to serialize a subclass of a POJO. Registering has the advantage
>> that you assign an id to the class which is written instead of the full
>> class name. The latter is usually much longer than the id.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I think the point is that Flink can use its serialization tools if you
>>> register the class in advance. If you don't do that, it will use Kryo
>>> as a fall-back which is slightly less efficient.
>>>
>>> Equals and hash code have to be implemented correctly if you compare
>>> Pojos. For standard types like String or Integer, this is done
>>> automatically. For Pojos, Flink doesn't know whether it is implemented
>>> correctly or not. Every object in Java has a default equals and
>>> hashCode implementation.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
>>> <po...@okkam.it> wrote:
>>> > Hi Max,
>>> > why do I need to register them? My job runs without problem also
>>> without
>>> > that.
>>> > The only problem with my POJOs was that I had to implement equals and
>>> hash
>>> > correctly, Flink didn't enforce me to do it but then results were
>>> wrong :(
>>> >
>>> >
>>> >
>>> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>> >>
>>> >> Hi Flavio,
>>> >>
>>> >> Stephan was referring to
>>> >>
>>> >> env.registerType(ExtendedClass1.class);
>>> >> env.registerType(ExtendedClass2.class);
>>> >>
>>> >> Cheers,
>>> >> Max
>>> >>
>>> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>>> >> <po...@okkam.it> wrote:
>>> >> > What do you mean exactly..? Probably I'm missing something
>>> >> > here..remember
>>> >> > that I can specify the right subClass only after the last flatMap,
>>> after
>>> >> > the
>>> >> > first map neither me nor Flink can know the exact subclass of
>>> BaseClass
>>> >> >
>>> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org>
>>> wrote:
>>> >> >>
>>> >> >> Class hierarchies should definitely work, even if the base class
>>> has no
>>> >> >> fields.
>>> >> >>
>>> >> >> They work more efficiently if you register the subclasses at the
>>> >> >> execution
>>> >> >> environment (Flink cannot infer them from the function signatures
>>> >> >> because
>>> >> >> the function signatures only contain the abstract base class).
>>> >> >>
>>> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>>> >> >> <po...@okkam.it> wrote:
>>> >> >>>
>>> >> >>> Because The classes are not related to each other. Do you think
>>> it's a
>>> >> >>> good idea to have something like this?
>>> >> >>>
>>> >> >>> abstract class BaseClass(){
>>> >> >>>    String someField;
>>> >> >>> }
>>> >> >>>
>>> >> >>> class ExtendedClass1 extends BaseClass (){
>>> >> >>>    String someOtherField11;
>>> >> >>>    String someOtherField12;
>>> >> >>>    String someOtherField13;
>>> >> >>>  ...
>>> >> >>> }
>>> >> >>>
>>> >> >>> class ExtendedClass2 extends BaseClass (){
>>> >> >>>    Integer someOtherField21;
>>> >> >>>    Double someOtherField22;
>>> >> >>>    Integer someOtherField23;
>>> >> >>>  ...
>>> >> >>> }
>>> >> >>>
>>> >> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
>>> >> >>> flatMap that can be used to generated the specific datasets?
>>> >> >>> Doesn't this cause problem to Flink? Classes can be vrry
>>> different to
>>> >> >>> each other..maybe this can cause problems with the plan
>>> >> >>> generation..isn't
>>> >> >>> it?
>>> >> >>>
>>> >> >>> Thanks Fabian and Stephan for the support!
>>> >> >>>
>>> >> >>>
>>> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org>
>>> >> >>> wrote:
>>> >> >>>>
>>> >> >>>> Why not use an abstract base class and N subclasses?
>>> >> >>>>
>>> >> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <
>>> fhueske@gmail.com>
>>> >> >>>> wrote:
>>> >> >>>>>
>>> >> >>>>> Unfortunately, there is no Either<1,...,n>.
>>> >> >>>>> You could implement something like a Tuple3<Option<Type1>,
>>> >> >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide
>>> an
>>> >> >>>>> Option
>>> >> >>>>> type (comes with Java8). You would need to implement it yourself
>>> >> >>>>> incl.
>>> >> >>>>> TypeInfo and Serializer. You can get some inspiration from the
>>> >> >>>>> Either type
>>> >> >>>>> info /serializer, if you want to go this way.
>>> >> >>>>>
>>> >> >>>>> Using a byte array would also work but doesn't look much easier
>>> than
>>> >> >>>>> the Option approach to me.
>>> >> >>>>>
>>> >> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <
>>> pompermaier@okkam.it>:
>>> >> >>>>>>
>>> >> >>>>>> Yes, the intermediate dataset I create then join again between
>>> >> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that
>>> possible to
>>> >> >>>>>> add?
>>> >> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]>
>>> and in
>>> >> >>>>>> the subsequent filter+map/flatMap deserialize only those
>>> elements I
>>> >> >>>>>> want to
>>> >> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to
>>> generate
>>> >> >>>>>> the typed
>>> >> >>>>>> dataset based.
>>> >> >>>>>> Which one  do you think is the best solution?
>>> >> >>>>>>
>>> >> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <
>>> fhueske@gmail.com>
>>> >> >>>>>> wrote:
>>> >> >>>>>>>
>>> >> >>>>>>> Hi Flavio,
>>> >> >>>>>>>
>>> >> >>>>>>> I did not completely understand which objects should go
>>> where, but
>>> >> >>>>>>> here are some general guidelines:
>>> >> >>>>>>>
>>> >> >>>>>>> - early filtering is mostly a good idea (unless evaluating the
>>> >> >>>>>>> filter
>>> >> >>>>>>> expression is very expensive)
>>> >> >>>>>>> - you can use a flatMap function to combine a map and a filter
>>> >> >>>>>>> - applying multiple functions on the same data set does not
>>> >> >>>>>>> necessarily materialize the data set (in memory or on disk).
>>> In
>>> >> >>>>>>> most cases
>>> >> >>>>>>> it prevents chaining, hence there is serialization overhead.
>>> In
>>> >> >>>>>>> some cases
>>> >> >>>>>>> where the forked data streams are joined again, the data set
>>> must
>>> >> >>>>>>> be
>>> >> >>>>>>> materialized in order to avoid deadlocks.
>>> >> >>>>>>> - it is not possible to write a map that generates two
>>> different
>>> >> >>>>>>> types, but you could implement a mapper that returns an
>>> >> >>>>>>> Either<First,
>>> >> >>>>>>> Second> type.
>>> >> >>>>>>>
>>> >> >>>>>>> Hope this helps,
>>> >> >>>>>>> Fabian
>>> >> >>>>>>>
>>> >> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier
>>> >> >>>>>>> <po...@okkam.it>:
>>> >> >>>>>>>>
>>> >> >>>>>>>> Any help on this?
>>> >> >>>>>>>>
>>> >> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <
>>> pompermaier@okkam.it>
>>> >> >>>>>>>> wrote:
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Hi to all,
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> in my program I have a Dataset that generated different
>>> types of
>>> >> >>>>>>>>> object wrt the incoming element.
>>> >> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>>> >> >>>>>>>>> In order to type the different generated datasets I do
>>> >> >>>>>>>>> something:
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Dataset<Tuple2> start =...
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>> >> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>> >> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>> >> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> However this is very inefficient (I think because Flink
>>> needs to
>>> >> >>>>>>>>> materialize the entire source dataset for every slot).
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> It's much more efficient to group the generation of objects
>>> of
>>> >> >>>>>>>>> the
>>> >> >>>>>>>>> same type. E.g.:
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Dataset<Tuple2> start =..
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>> >> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>> >> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>> >> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>> >> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>> >> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Increasing the number of slots per task manager make things
>>> >> >>>>>>>>> worse
>>> >> >>>>>>>>> and worse :)
>>> >> >>>>>>>>> Is there a way to improve this situation? Is it possible to
>>> >> >>>>>>>>> write a
>>> >> >>>>>>>>> "map" generating different type of object and then filter
>>> them
>>> >> >>>>>>>>> by generated
>>> >> >>>>>>>>> class type?
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Best,
>>> >> >>>>>>>>> Flavio
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>
>>> >> >>>>>>
>>> >> >>>>>>
>>> >> >>>>>
>>> >> >>>>
>>> >> >>>
>>> >> >>
>>> >> >
>>>
>>
>>
>
>
>

Re: Dataset filter improvement

Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks Max and Till for the answers. However I still didn't understand
fully the difference...Here are my doubts:

   - If I don't register any of my POJO classes, they will be serialized
   with Kryo (black box for Flink)
   - If I register all of my POJO using env.registerType they will be
   serialized as POJO (which is slower than Tuple serialization but much
   faster than Kryo)
   - What if I call env.registerTypeWithKryoSerializer()? Why should I
   specify a serializer for Kryo?

Best,
Flavio

On Tue, Feb 23, 2016 at 4:08 PM, Till Rohrmann <tr...@apache.org> wrote:

> Registering a data type is only relevant for the Kryo serializer or if you
> want to serialize a subclass of a POJO. Registering has the advantage that
> you assign an id to the class which is written instead of the full class
> name. The latter is usually much longer than the id.
>
> Cheers,
> Till
>
> On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels <mx...@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> I think the point is that Flink can use its serialization tools if you
>> register the class in advance. If you don't do that, it will use Kryo
>> as a fall-back which is slightly less efficient.
>>
>> Equals and hash code have to be implemented correctly if you compare
>> Pojos. For standard types like String or Integer, this is done
>> automatically. For Pojos, Flink doesn't know whether it is implemented
>> correctly or not. Every object in Java has a default equals and
>> hashCode implementation.
>>
>> Cheers,
>> Max
>>
>> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
>> <po...@okkam.it> wrote:
>> > Hi Max,
>> > why do I need to register them? My job runs without problem also without
>> > that.
>> > The only problem with my POJOs was that I had to implement equals and
>> hash
>> > correctly, Flink didn't enforce me to do it but then results were wrong
>> :(
>> >
>> >
>> >
>> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>> >>
>> >> Hi Flavio,
>> >>
>> >> Stephan was referring to
>> >>
>> >> env.registerType(ExtendedClass1.class);
>> >> env.registerType(ExtendedClass2.class);
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>> >> <po...@okkam.it> wrote:
>> >> > What do you mean exactly..? Probably I'm missing something
>> >> > here..remember
>> >> > that I can specify the right subClass only after the last flatMap,
>> after
>> >> > the
>> >> > first map neither me nor Flink can know the exact subclass of
>> BaseClass
>> >> >
>> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org>
>> wrote:
>> >> >>
>> >> >> Class hierarchies should definitely work, even if the base class
>> has no
>> >> >> fields.
>> >> >>
>> >> >> They work more efficiently if you register the subclasses at the
>> >> >> execution
>> >> >> environment (Flink cannot infer them from the function signatures
>> >> >> because
>> >> >> the function signatures only contain the abstract base class).
>> >> >>
>> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> >> >> <po...@okkam.it> wrote:
>> >> >>>
>> >> >>> Because The classes are not related to each other. Do you think
>> it's a
>> >> >>> good idea to have something like this?
>> >> >>>
>> >> >>> abstract class BaseClass(){
>> >> >>>    String someField;
>> >> >>> }
>> >> >>>
>> >> >>> class ExtendedClass1 extends BaseClass (){
>> >> >>>    String someOtherField11;
>> >> >>>    String someOtherField12;
>> >> >>>    String someOtherField13;
>> >> >>>  ...
>> >> >>> }
>> >> >>>
>> >> >>> class ExtendedClass2 extends BaseClass (){
>> >> >>>    Integer someOtherField21;
>> >> >>>    Double someOtherField22;
>> >> >>>    Integer someOtherField23;
>> >> >>>  ...
>> >> >>> }
>> >> >>>
>> >> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
>> >> >>> flatMap that can be used to generated the specific datasets?
>> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different
>> to
>> >> >>> each other..maybe this can cause problems with the plan
>> >> >>> generation..isn't
>> >> >>> it?
>> >> >>>
>> >> >>> Thanks Fabian and Stephan for the support!
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> Why not use an abstract base class and N subclasses?
>> >> >>>>
>> >> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <
>> fhueske@gmail.com>
>> >> >>>> wrote:
>> >> >>>>>
>> >> >>>>> Unfortunately, there is no Either<1,...,n>.
>> >> >>>>> You could implement something like a Tuple3<Option<Type1>,
>> >> >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an
>> >> >>>>> Option
>> >> >>>>> type (comes with Java8). You would need to implement it yourself
>> >> >>>>> incl.
>> >> >>>>> TypeInfo and Serializer. You can get some inspiration from the
>> >> >>>>> Either type
>> >> >>>>> info /serializer, if you want to go this way.
>> >> >>>>>
>> >> >>>>> Using a byte array would also work but doesn't look much easier
>> than
>> >> >>>>> the Option approach to me.
>> >> >>>>>
>> >> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <
>> pompermaier@okkam.it>:
>> >> >>>>>>
>> >> >>>>>> Yes, the intermediate dataset I create then join again between
>> >> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that
>> possible to
>> >> >>>>>> add?
>> >> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]>
>> and in
>> >> >>>>>> the subsequent filter+map/flatMap deserialize only those
>> elements I
>> >> >>>>>> want to
>> >> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate
>> >> >>>>>> the typed
>> >> >>>>>> dataset based.
>> >> >>>>>> Which one  do you think is the best solution?
>> >> >>>>>>
>> >> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <
>> fhueske@gmail.com>
>> >> >>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>> Hi Flavio,
>> >> >>>>>>>
>> >> >>>>>>> I did not completely understand which objects should go where,
>> but
>> >> >>>>>>> here are some general guidelines:
>> >> >>>>>>>
>> >> >>>>>>> - early filtering is mostly a good idea (unless evaluating the
>> >> >>>>>>> filter
>> >> >>>>>>> expression is very expensive)
>> >> >>>>>>> - you can use a flatMap function to combine a map and a filter
>> >> >>>>>>> - applying multiple functions on the same data set does not
>> >> >>>>>>> necessarily materialize the data set (in memory or on disk). In
>> >> >>>>>>> most cases
>> >> >>>>>>> it prevents chaining, hence there is serialization overhead. In
>> >> >>>>>>> some cases
>> >> >>>>>>> where the forked data streams are joined again, the data set
>> must
>> >> >>>>>>> be
>> >> >>>>>>> materialized in order to avoid deadlocks.
>> >> >>>>>>> - it is not possible to write a map that generates two
>> different
>> >> >>>>>>> types, but you could implement a mapper that returns an
>> >> >>>>>>> Either<First,
>> >> >>>>>>> Second> type.
>> >> >>>>>>>
>> >> >>>>>>> Hope this helps,
>> >> >>>>>>> Fabian
>> >> >>>>>>>
>> >> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier
>> >> >>>>>>> <po...@okkam.it>:
>> >> >>>>>>>>
>> >> >>>>>>>> Any help on this?
>> >> >>>>>>>>
>> >> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <
>> pompermaier@okkam.it>
>> >> >>>>>>>> wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>> Hi to all,
>> >> >>>>>>>>>
>> >> >>>>>>>>> in my program I have a Dataset that generated different
>> types of
>> >> >>>>>>>>> object wrt the incoming element.
>> >> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>> >> >>>>>>>>> In order to type the different generated datasets I do
>> >> >>>>>>>>> something:
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<Tuple2> start =...
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>> >> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>> >> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>> >> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>> >> >>>>>>>>>
>> >> >>>>>>>>> However this is very inefficient (I think because Flink
>> needs to
>> >> >>>>>>>>> materialize the entire source dataset for every slot).
>> >> >>>>>>>>>
>> >> >>>>>>>>> It's much more efficient to group the generation of objects
>> of
>> >> >>>>>>>>> the
>> >> >>>>>>>>> same type. E.g.:
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<Tuple2> start =..
>> >> >>>>>>>>>
>> >> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>> >> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>> >> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>> >> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>> >> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>> >> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>> >> >>>>>>>>>
>> >> >>>>>>>>> Increasing the number of slots per task manager make things
>> >> >>>>>>>>> worse
>> >> >>>>>>>>> and worse :)
>> >> >>>>>>>>> Is there a way to improve this situation? Is it possible to
>> >> >>>>>>>>> write a
>> >> >>>>>>>>> "map" generating different type of object and then filter
>> them
>> >> >>>>>>>>> by generated
>> >> >>>>>>>>> class type?
>> >> >>>>>>>>>
>> >> >>>>>>>>> Best,
>> >> >>>>>>>>> Flavio
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>
>> >> >>>>
>> >> >>>
>> >> >>
>> >> >
>>
>
>

Re: Dataset filter improvement

Posted by Till Rohrmann <tr...@apache.org>.
Registering a data type is only relevant for the Kryo serializer or if you
want to serialize a subclass of a POJO. Registering has the advantage that
you assign an id to the class which is written instead of the full class
name. The latter is usually much longer than the id.

Cheers,
Till

On Tue, Feb 23, 2016 at 3:17 PM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Flavio,
>
> I think the point is that Flink can use its serialization tools if you
> register the class in advance. If you don't do that, it will use Kryo
> as a fall-back which is slightly less efficient.
>
> Equals and hash code have to be implemented correctly if you compare
> Pojos. For standard types like String or Integer, this is done
> automatically. For Pojos, Flink doesn't know whether it is implemented
> correctly or not. Every object in Java has a default equals and
> hashCode implementation.
>
> Cheers,
> Max
>
> On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
> <po...@okkam.it> wrote:
> > Hi Max,
> > why do I need to register them? My job runs without problem also without
> > that.
> > The only problem with my POJOs was that I had to implement equals and
> hash
> > correctly, Flink didn't enforce me to do it but then results were wrong
> :(
> >
> >
> >
> > On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mx...@apache.org>
> wrote:
> >>
> >> Hi Flavio,
> >>
> >> Stephan was referring to
> >>
> >> env.registerType(ExtendedClass1.class);
> >> env.registerType(ExtendedClass2.class);
> >>
> >> Cheers,
> >> Max
> >>
> >> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
> >> <po...@okkam.it> wrote:
> >> > What do you mean exactly..? Probably I'm missing something
> >> > here..remember
> >> > that I can specify the right subClass only after the last flatMap,
> after
> >> > the
> >> > first map neither me nor Flink can know the exact subclass of
> BaseClass
> >> >
> >> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org>
> wrote:
> >> >>
> >> >> Class hierarchies should definitely work, even if the base class has
> no
> >> >> fields.
> >> >>
> >> >> They work more efficiently if you register the subclasses at the
> >> >> execution
> >> >> environment (Flink cannot infer them from the function signatures
> >> >> because
> >> >> the function signatures only contain the abstract base class).
> >> >>
> >> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
> >> >> <po...@okkam.it> wrote:
> >> >>>
> >> >>> Because The classes are not related to each other. Do you think
> it's a
> >> >>> good idea to have something like this?
> >> >>>
> >> >>> abstract class BaseClass(){
> >> >>>    String someField;
> >> >>> }
> >> >>>
> >> >>> class ExtendedClass1 extends BaseClass (){
> >> >>>    String someOtherField11;
> >> >>>    String someOtherField12;
> >> >>>    String someOtherField13;
> >> >>>  ...
> >> >>> }
> >> >>>
> >> >>> class ExtendedClass2 extends BaseClass (){
> >> >>>    Integer someOtherField21;
> >> >>>    Double someOtherField22;
> >> >>>    Integer someOtherField23;
> >> >>>  ...
> >> >>> }
> >> >>>
> >> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
> >> >>> flatMap that can be used to generated the specific datasets?
> >> >>> Doesn't this cause problem to Flink? Classes can be vrry different
> to
> >> >>> each other..maybe this can cause problems with the plan
> >> >>> generation..isn't
> >> >>> it?
> >> >>>
> >> >>> Thanks Fabian and Stephan for the support!
> >> >>>
> >> >>>
> >> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org>
> >> >>> wrote:
> >> >>>>
> >> >>>> Why not use an abstract base class and N subclasses?
> >> >>>>
> >> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fhueske@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Unfortunately, there is no Either<1,...,n>.
> >> >>>>> You could implement something like a Tuple3<Option<Type1>,
> >> >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an
> >> >>>>> Option
> >> >>>>> type (comes with Java8). You would need to implement it yourself
> >> >>>>> incl.
> >> >>>>> TypeInfo and Serializer. You can get some inspiration from the
> >> >>>>> Either type
> >> >>>>> info /serializer, if you want to go this way.
> >> >>>>>
> >> >>>>> Using a byte array would also work but doesn't look much easier
> than
> >> >>>>> the Option approach to me.
> >> >>>>>
> >> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <
> pompermaier@okkam.it>:
> >> >>>>>>
> >> >>>>>> Yes, the intermediate dataset I create then join again between
> >> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible
> to
> >> >>>>>> add?
> >> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and
> in
> >> >>>>>> the subsequent filter+map/flatMap deserialize only those
> elements I
> >> >>>>>> want to
> >> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate
> >> >>>>>> the typed
> >> >>>>>> dataset based.
> >> >>>>>> Which one  do you think is the best solution?
> >> >>>>>>
> >> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <
> fhueske@gmail.com>
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>> Hi Flavio,
> >> >>>>>>>
> >> >>>>>>> I did not completely understand which objects should go where,
> but
> >> >>>>>>> here are some general guidelines:
> >> >>>>>>>
> >> >>>>>>> - early filtering is mostly a good idea (unless evaluating the
> >> >>>>>>> filter
> >> >>>>>>> expression is very expensive)
> >> >>>>>>> - you can use a flatMap function to combine a map and a filter
> >> >>>>>>> - applying multiple functions on the same data set does not
> >> >>>>>>> necessarily materialize the data set (in memory or on disk). In
> >> >>>>>>> most cases
> >> >>>>>>> it prevents chaining, hence there is serialization overhead. In
> >> >>>>>>> some cases
> >> >>>>>>> where the forked data streams are joined again, the data set
> must
> >> >>>>>>> be
> >> >>>>>>> materialized in order to avoid deadlocks.
> >> >>>>>>> - it is not possible to write a map that generates two different
> >> >>>>>>> types, but you could implement a mapper that returns an
> >> >>>>>>> Either<First,
> >> >>>>>>> Second> type.
> >> >>>>>>>
> >> >>>>>>> Hope this helps,
> >> >>>>>>> Fabian
> >> >>>>>>>
> >> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier
> >> >>>>>>> <po...@okkam.it>:
> >> >>>>>>>>
> >> >>>>>>>> Any help on this?
> >> >>>>>>>>
> >> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <
> pompermaier@okkam.it>
> >> >>>>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>> Hi to all,
> >> >>>>>>>>>
> >> >>>>>>>>> in my program I have a Dataset that generated different types
> of
> >> >>>>>>>>> object wrt the incoming element.
> >> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
> >> >>>>>>>>> In order to type the different generated datasets I do
> >> >>>>>>>>> something:
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<Tuple2> start =...
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
> >> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
> >> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
> >> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
> >> >>>>>>>>>
> >> >>>>>>>>> However this is very inefficient (I think because Flink needs
> to
> >> >>>>>>>>> materialize the entire source dataset for every slot).
> >> >>>>>>>>>
> >> >>>>>>>>> It's much more efficient to group the generation of objects of
> >> >>>>>>>>> the
> >> >>>>>>>>> same type. E.g.:
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<Tuple2> start =..
> >> >>>>>>>>>
> >> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
> >> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
> >> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
> >> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
> >> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
> >> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
> >> >>>>>>>>>
> >> >>>>>>>>> Increasing the number of slots per task manager make things
> >> >>>>>>>>> worse
> >> >>>>>>>>> and worse :)
> >> >>>>>>>>> Is there a way to improve this situation? Is it possible to
> >> >>>>>>>>> write a
> >> >>>>>>>>> "map" generating different type of object and then filter them
> >> >>>>>>>>> by generated
> >> >>>>>>>>> class type?
> >> >>>>>>>>>
> >> >>>>>>>>> Best,
> >> >>>>>>>>> Flavio
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >> >
>

Re: Dataset filter improvement

Posted by Maximilian Michels <mx...@apache.org>.
Hi Flavio,

I think the point is that Flink can use its serialization tools if you
register the class in advance. If you don't do that, it will use Kryo
as a fall-back which is slightly less efficient.

Equals and hash code have to be implemented correctly if you compare
Pojos. For standard types like String or Integer, this is done
automatically. For Pojos, Flink doesn't know whether it is implemented
correctly or not. Every object in Java has a default equals and
hashCode implementation.

Cheers,
Max

On Wed, Feb 17, 2016 at 10:22 AM, Flavio Pompermaier
<po...@okkam.it> wrote:
> Hi Max,
> why do I need to register them? My job runs without problem also without
> that.
> The only problem with my POJOs was that I had to implement equals and hash
> correctly, Flink didn't enforce me to do it but then results were wrong :(
>
>
>
> On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mx...@apache.org> wrote:
>>
>> Hi Flavio,
>>
>> Stephan was referring to
>>
>> env.registerType(ExtendedClass1.class);
>> env.registerType(ExtendedClass2.class);
>>
>> Cheers,
>> Max
>>
>> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
>> <po...@okkam.it> wrote:
>> > What do you mean exactly..? Probably I'm missing something
>> > here..remember
>> > that I can specify the right subClass only after the last flatMap, after
>> > the
>> > first map neither me nor Flink can know the exact subclass of BaseClass
>> >
>> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org> wrote:
>> >>
>> >> Class hierarchies should definitely work, even if the base class has no
>> >> fields.
>> >>
>> >> They work more efficiently if you register the subclasses at the
>> >> execution
>> >> environment (Flink cannot infer them from the function signatures
>> >> because
>> >> the function signatures only contain the abstract base class).
>> >>
>> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> >> <po...@okkam.it> wrote:
>> >>>
>> >>> Because The classes are not related to each other. Do you think it's a
>> >>> good idea to have something like this?
>> >>>
>> >>> abstract class BaseClass(){
>> >>>    String someField;
>> >>> }
>> >>>
>> >>> class ExtendedClass1 extends BaseClass (){
>> >>>    String someOtherField11;
>> >>>    String someOtherField12;
>> >>>    String someOtherField13;
>> >>>  ...
>> >>> }
>> >>>
>> >>> class ExtendedClass2 extends BaseClass (){
>> >>>    Integer someOtherField21;
>> >>>    Double someOtherField22;
>> >>>    Integer someOtherField23;
>> >>>  ...
>> >>> }
>> >>>
>> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
>> >>> flatMap that can be used to generated the specific datasets?
>> >>> Doesn't this cause problem to Flink? Classes can be vrry different to
>> >>> each other..maybe this can cause problems with the plan
>> >>> generation..isn't
>> >>> it?
>> >>>
>> >>> Thanks Fabian and Stephan for the support!
>> >>>
>> >>>
>> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org>
>> >>> wrote:
>> >>>>
>> >>>> Why not use an abstract base class and N subclasses?
>> >>>>
>> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Unfortunately, there is no Either<1,...,n>.
>> >>>>> You could implement something like a Tuple3<Option<Type1>,
>> >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an
>> >>>>> Option
>> >>>>> type (comes with Java8). You would need to implement it yourself
>> >>>>> incl.
>> >>>>> TypeInfo and Serializer. You can get some inspiration from the
>> >>>>> Either type
>> >>>>> info /serializer, if you want to go this way.
>> >>>>>
>> >>>>> Using a byte array would also work but doesn't look much easier than
>> >>>>> the Option approach to me.
>> >>>>>
>> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>> >>>>>>
>> >>>>>> Yes, the intermediate dataset I create then join again between
>> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to
>> >>>>>> add?
>> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in
>> >>>>>> the subsequent filter+map/flatMap deserialize only those elements I
>> >>>>>> want to
>> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate
>> >>>>>> the typed
>> >>>>>> dataset based.
>> >>>>>> Which one  do you think is the best solution?
>> >>>>>>
>> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>> Hi Flavio,
>> >>>>>>>
>> >>>>>>> I did not completely understand which objects should go where, but
>> >>>>>>> here are some general guidelines:
>> >>>>>>>
>> >>>>>>> - early filtering is mostly a good idea (unless evaluating the
>> >>>>>>> filter
>> >>>>>>> expression is very expensive)
>> >>>>>>> - you can use a flatMap function to combine a map and a filter
>> >>>>>>> - applying multiple functions on the same data set does not
>> >>>>>>> necessarily materialize the data set (in memory or on disk). In
>> >>>>>>> most cases
>> >>>>>>> it prevents chaining, hence there is serialization overhead. In
>> >>>>>>> some cases
>> >>>>>>> where the forked data streams are joined again, the data set must
>> >>>>>>> be
>> >>>>>>> materialized in order to avoid deadlocks.
>> >>>>>>> - it is not possible to write a map that generates two different
>> >>>>>>> types, but you could implement a mapper that returns an
>> >>>>>>> Either<First,
>> >>>>>>> Second> type.
>> >>>>>>>
>> >>>>>>> Hope this helps,
>> >>>>>>> Fabian
>> >>>>>>>
>> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier
>> >>>>>>> <po...@okkam.it>:
>> >>>>>>>>
>> >>>>>>>> Any help on this?
>> >>>>>>>>
>> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi to all,
>> >>>>>>>>>
>> >>>>>>>>> in my program I have a Dataset that generated different types of
>> >>>>>>>>> object wrt the incoming element.
>> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>> >>>>>>>>> In order to type the different generated datasets I do
>> >>>>>>>>> something:
>> >>>>>>>>>
>> >>>>>>>>> Dataset<Tuple2> start =...
>> >>>>>>>>>
>> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>> >>>>>>>>>
>> >>>>>>>>> However this is very inefficient (I think because Flink needs to
>> >>>>>>>>> materialize the entire source dataset for every slot).
>> >>>>>>>>>
>> >>>>>>>>> It's much more efficient to group the generation of objects of
>> >>>>>>>>> the
>> >>>>>>>>> same type. E.g.:
>> >>>>>>>>>
>> >>>>>>>>> Dataset<Tuple2> start =..
>> >>>>>>>>>
>> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>> >>>>>>>>>
>> >>>>>>>>> Increasing the number of slots per task manager make things
>> >>>>>>>>> worse
>> >>>>>>>>> and worse :)
>> >>>>>>>>> Is there a way to improve this situation? Is it possible to
>> >>>>>>>>> write a
>> >>>>>>>>> "map" generating different type of object and then filter them
>> >>>>>>>>> by generated
>> >>>>>>>>> class type?
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Flavio
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >

Re: Dataset filter improvement

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Max,
why do I need to register them? My job runs without problem also without
that.
The only problem with my POJOs was that I had to implement equals and hash
correctly, Flink didn't enforce me to do it but then results were wrong :(


On Wed, Feb 17, 2016 at 10:16 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Flavio,
>
> Stephan was referring to
>
> env.registerType(ExtendedClass1.class);
> env.registerType(ExtendedClass2.class);
>
> Cheers,
> Max
>
> On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
> <po...@okkam.it> wrote:
> > What do you mean exactly..? Probably I'm missing something here..remember
> > that I can specify the right subClass only after the last flatMap, after
> the
> > first map neither me nor Flink can know the exact subclass of BaseClass
> >
> > On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org> wrote:
> >>
> >> Class hierarchies should definitely work, even if the base class has no
> >> fields.
> >>
> >> They work more efficiently if you register the subclasses at the
> execution
> >> environment (Flink cannot infer them from the function signatures
> because
> >> the function signatures only contain the abstract base class).
> >>
> >> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
> >> <po...@okkam.it> wrote:
> >>>
> >>> Because The classes are not related to each other. Do you think it's a
> >>> good idea to have something like this?
> >>>
> >>> abstract class BaseClass(){
> >>>    String someField;
> >>> }
> >>>
> >>> class ExtendedClass1 extends BaseClass (){
> >>>    String someOtherField11;
> >>>    String someOtherField12;
> >>>    String someOtherField13;
> >>>  ...
> >>> }
> >>>
> >>> class ExtendedClass2 extends BaseClass (){
> >>>    Integer someOtherField21;
> >>>    Double someOtherField22;
> >>>    Integer someOtherField23;
> >>>  ...
> >>> }
> >>>
> >>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
> >>> flatMap that can be used to generated the specific datasets?
> >>> Doesn't this cause problem to Flink? Classes can be vrry different to
> >>> each other..maybe this can cause problems with the plan
> generation..isn't
> >>> it?
> >>>
> >>> Thanks Fabian and Stephan for the support!
> >>>
> >>>
> >>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org>
> wrote:
> >>>>
> >>>> Why not use an abstract base class and N subclasses?
> >>>>
> >>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> Unfortunately, there is no Either<1,...,n>.
> >>>>> You could implement something like a Tuple3<Option<Type1>,
> >>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an
> Option
> >>>>> type (comes with Java8). You would need to implement it yourself
> incl.
> >>>>> TypeInfo and Serializer. You can get some inspiration from the
> Either type
> >>>>> info /serializer, if you want to go this way.
> >>>>>
> >>>>> Using a byte array would also work but doesn't look much easier than
> >>>>> the Option approach to me.
> >>>>>
> >>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
> >>>>>>
> >>>>>> Yes, the intermediate dataset I create then join again between
> >>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to
> add?
> >>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in
> >>>>>> the subsequent filter+map/flatMap deserialize only those elements I
> want to
> >>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate
> the typed
> >>>>>> dataset based.
> >>>>>> Which one  do you think is the best solution?
> >>>>>>
> >>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Flavio,
> >>>>>>>
> >>>>>>> I did not completely understand which objects should go where, but
> >>>>>>> here are some general guidelines:
> >>>>>>>
> >>>>>>> - early filtering is mostly a good idea (unless evaluating the
> filter
> >>>>>>> expression is very expensive)
> >>>>>>> - you can use a flatMap function to combine a map and a filter
> >>>>>>> - applying multiple functions on the same data set does not
> >>>>>>> necessarily materialize the data set (in memory or on disk). In
> most cases
> >>>>>>> it prevents chaining, hence there is serialization overhead. In
> some cases
> >>>>>>> where the forked data streams are joined again, the data set must
> be
> >>>>>>> materialized in order to avoid deadlocks.
> >>>>>>> - it is not possible to write a map that generates two different
> >>>>>>> types, but you could implement a mapper that returns an
> Either<First,
> >>>>>>> Second> type.
> >>>>>>>
> >>>>>>> Hope this helps,
> >>>>>>> Fabian
> >>>>>>>
> >>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it
> >:
> >>>>>>>>
> >>>>>>>> Any help on this?
> >>>>>>>>
> >>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi to all,
> >>>>>>>>>
> >>>>>>>>> in my program I have a Dataset that generated different types of
> >>>>>>>>> object wrt the incoming element.
> >>>>>>>>> Thus it's like a Map<Tuple2,Object>.
> >>>>>>>>> In order to type the different generated datasets I do something:
> >>>>>>>>>
> >>>>>>>>> Dataset<Tuple2> start =...
> >>>>>>>>>
> >>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
> >>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
> >>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
> >>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
> >>>>>>>>>
> >>>>>>>>> However this is very inefficient (I think because Flink needs to
> >>>>>>>>> materialize the entire source dataset for every slot).
> >>>>>>>>>
> >>>>>>>>> It's much more efficient to group the generation of objects of
> the
> >>>>>>>>> same type. E.g.:
> >>>>>>>>>
> >>>>>>>>> Dataset<Tuple2> start =..
> >>>>>>>>>
> >>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
> >>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
> >>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
> >>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
> >>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
> >>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
> >>>>>>>>>
> >>>>>>>>> Increasing the number of slots per task manager make things worse
> >>>>>>>>> and worse :)
> >>>>>>>>> Is there a way to improve this situation? Is it possible to
> write a
> >>>>>>>>> "map" generating different type of object and then filter them
> by generated
> >>>>>>>>> class type?
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Flavio
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: Dataset filter improvement

Posted by Maximilian Michels <mx...@apache.org>.
Hi Flavio,

Stephan was referring to

env.registerType(ExtendedClass1.class);
env.registerType(ExtendedClass2.class);

Cheers,
Max

On Wed, Feb 10, 2016 at 12:48 PM, Flavio Pompermaier
<po...@okkam.it> wrote:
> What do you mean exactly..? Probably I'm missing something here..remember
> that I can specify the right subClass only after the last flatMap, after the
> first map neither me nor Flink can know the exact subclass of BaseClass
>
> On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> Class hierarchies should definitely work, even if the base class has no
>> fields.
>>
>> They work more efficiently if you register the subclasses at the execution
>> environment (Flink cannot infer them from the function signatures because
>> the function signatures only contain the abstract base class).
>>
>> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier
>> <po...@okkam.it> wrote:
>>>
>>> Because The classes are not related to each other. Do you think it's a
>>> good idea to have something like this?
>>>
>>> abstract class BaseClass(){
>>>    String someField;
>>> }
>>>
>>> class ExtendedClass1 extends BaseClass (){
>>>    String someOtherField11;
>>>    String someOtherField12;
>>>    String someOtherField13;
>>>  ...
>>> }
>>>
>>> class ExtendedClass2 extends BaseClass (){
>>>    Integer someOtherField21;
>>>    Double someOtherField22;
>>>    Integer someOtherField23;
>>>  ...
>>> }
>>>
>>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
>>> flatMap that can be used to generated the specific datasets?
>>> Doesn't this cause problem to Flink? Classes can be vrry different to
>>> each other..maybe this can cause problems with the plan generation..isn't
>>> it?
>>>
>>> Thanks Fabian and Stephan for the support!
>>>
>>>
>>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>> Why not use an abstract base class and N subclasses?
>>>>
>>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>>
>>>>> Unfortunately, there is no Either<1,...,n>.
>>>>> You could implement something like a Tuple3<Option<Type1>,
>>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an Option
>>>>> type (comes with Java8). You would need to implement it yourself incl.
>>>>> TypeInfo and Serializer. You can get some inspiration from the Either type
>>>>> info /serializer, if you want to go this way.
>>>>>
>>>>> Using a byte array would also work but doesn't look much easier than
>>>>> the Option approach to me.
>>>>>
>>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>> Yes, the intermediate dataset I create then join again between
>>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to add?
>>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in
>>>>>> the subsequent filter+map/flatMap deserialize only those elements I want to
>>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
>>>>>> dataset based.
>>>>>> Which one  do you think is the best solution?
>>>>>>
>>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> I did not completely understand which objects should go where, but
>>>>>>> here are some general guidelines:
>>>>>>>
>>>>>>> - early filtering is mostly a good idea (unless evaluating the filter
>>>>>>> expression is very expensive)
>>>>>>> - you can use a flatMap function to combine a map and a filter
>>>>>>> - applying multiple functions on the same data set does not
>>>>>>> necessarily materialize the data set (in memory or on disk). In most cases
>>>>>>> it prevents chaining, hence there is serialization overhead. In some cases
>>>>>>> where the forked data streams are joined again, the data set must be
>>>>>>> materialized in order to avoid deadlocks.
>>>>>>> - it is not possible to write a map that generates two different
>>>>>>> types, but you could implement a mapper that returns an Either<First,
>>>>>>> Second> type.
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>>>
>>>>>>>> Any help on this?
>>>>>>>>
>>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> in my program I have a Dataset that generated different types of
>>>>>>>>> object wrt the incoming element.
>>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>>>>>> In order to type the different generated datasets I do something:
>>>>>>>>>
>>>>>>>>> Dataset<Tuple2> start =...
>>>>>>>>>
>>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>>>>>
>>>>>>>>> However this is very inefficient (I think because Flink needs to
>>>>>>>>> materialize the entire source dataset for every slot).
>>>>>>>>>
>>>>>>>>> It's much more efficient to group the generation of objects of the
>>>>>>>>> same type. E.g.:
>>>>>>>>>
>>>>>>>>> Dataset<Tuple2> start =..
>>>>>>>>>
>>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>>>>>
>>>>>>>>> Increasing the number of slots per task manager make things worse
>>>>>>>>> and worse :)
>>>>>>>>> Is there a way to improve this situation? Is it possible to write a
>>>>>>>>> "map" generating different type of object and then filter them by generated
>>>>>>>>> class type?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Dataset filter improvement

Posted by Flavio Pompermaier <po...@okkam.it>.
What do you mean exactly..? Probably I'm missing something here..remember
that I can specify the right subClass only after the last flatMap, after
the first map neither me nor Flink can know the exact subclass of BaseClass

On Wed, Feb 10, 2016 at 12:42 PM, Stephan Ewen <se...@apache.org> wrote:

> Class hierarchies should definitely work, even if the base class has no
> fields.
>
> They work more efficiently if you register the subclasses at the execution
> environment (Flink cannot infer them from the function signatures because
> the function signatures only contain the abstract base class).
>
> On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> Because The classes are not related to each other. Do you think it's a
>> good idea to have something like this?
>>
>> abstract class BaseClass(){
>>    String someField;
>> }
>>
>> class ExtendedClass1 extends BaseClass (){
>>    String someOtherField11;
>>    String someOtherField12;
>>    String someOtherField13;
>>  ...
>> }
>>
>> class ExtendedClass2 extends BaseClass (){
>>    Integer someOtherField21;
>>    Double someOtherField22;
>>    Integer someOtherField23;
>>  ...
>> }
>>
>> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
>> flatMap that can be used to generated the specific datasets?
>> Doesn't this cause problem to Flink? Classes can be vrry different to
>> each other..maybe this can cause problems with the plan generation..isn't
>> it?
>>
>> Thanks Fabian and Stephan for the support!
>>
>>
>> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Why not use an abstract base class and N subclasses?
>>>
>>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Unfortunately, there is no Either<1,...,n>.
>>>> You could implement something like a Tuple3<Option<Type1>,
>>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an Option
>>>> type (comes with Java8). You would need to implement it yourself incl.
>>>> TypeInfo and Serializer. You can get some inspiration from the Either type
>>>> info /serializer, if you want to go this way.
>>>>
>>>> Using a byte array would also work but doesn't look much easier than
>>>> the Option approach to me.
>>>>
>>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Yes, the intermediate dataset I create then join again between
>>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to add?
>>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in
>>>>> the subsequent filter+map/flatMap deserialize only those elements I want to
>>>>> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
>>>>> dataset based.
>>>>> Which one  do you think is the best solution?
>>>>>
>>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> I did not completely understand which objects should go where, but
>>>>>> here are some general guidelines:
>>>>>>
>>>>>> - early filtering is mostly a good idea (unless evaluating the filter
>>>>>> expression is very expensive)
>>>>>> - you can use a flatMap function to combine a map and a filter
>>>>>> - applying multiple functions on the same data set does not
>>>>>> necessarily materialize the data set (in memory or on disk). In most cases
>>>>>> it prevents chaining, hence there is serialization overhead. In some cases
>>>>>> where the forked data streams are joined again, the data set must be
>>>>>> materialized in order to avoid deadlocks.
>>>>>> - it is not possible to write a map that generates two different
>>>>>> types, but you could implement a mapper that returns an Either<First,
>>>>>> Second> type.
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> Any help on this?
>>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>>
>>>>>>>> in my program I have a Dataset that generated different types of
>>>>>>>> object wrt the incoming element.
>>>>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>>>>> In order to type the different generated datasets I do something:
>>>>>>>>
>>>>>>>> Dataset<Tuple2> start =...
>>>>>>>>
>>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>>>>
>>>>>>>> However this is very inefficient (I think because Flink needs to
>>>>>>>> materialize the entire source dataset for every slot).
>>>>>>>>
>>>>>>>> It's much more efficient to group the generation of objects of the
>>>>>>>> same type. E.g.:
>>>>>>>>
>>>>>>>> Dataset<Tuple2> start =..
>>>>>>>>
>>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>>>>
>>>>>>>> Increasing the number of slots per task manager make things worse
>>>>>>>> and worse :)
>>>>>>>> Is there a way to improve this situation? Is it possible to write a
>>>>>>>> "map" generating different type of object and then filter them by generated
>>>>>>>> class type?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Dataset filter improvement

Posted by Stephan Ewen <se...@apache.org>.
Class hierarchies should definitely work, even if the base class has no
fields.

They work more efficiently if you register the subclasses at the execution
environment (Flink cannot infer them from the function signatures because
the function signatures only contain the abstract base class).

On Wed, Feb 10, 2016 at 12:23 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Because The classes are not related to each other. Do you think it's a
> good idea to have something like this?
>
> abstract class BaseClass(){
>    String someField;
> }
>
> class ExtendedClass1 extends BaseClass (){
>    String someOtherField11;
>    String someOtherField12;
>    String someOtherField13;
>  ...
> }
>
> class ExtendedClass2 extends BaseClass (){
>    Integer someOtherField21;
>    Double someOtherField22;
>    Integer someOtherField23;
>  ...
> }
>
> and then declare my map as Map<Tuple2,BaseClass>. and then apply a
> flatMap that can be used to generated the specific datasets?
> Doesn't this cause problem to Flink? Classes can be vrry different to each
> other..maybe this can cause problems with the plan generation..isn't it?
>
> Thanks Fabian and Stephan for the support!
>
>
> On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Why not use an abstract base class and N subclasses?
>>
>> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Unfortunately, there is no Either<1,...,n>.
>>> You could implement something like a Tuple3<Option<Type1>,
>>> Option<Type2>, Option<Type3>>. However, Flink does not provide an Option
>>> type (comes with Java8). You would need to implement it yourself incl.
>>> TypeInfo and Serializer. You can get some inspiration from the Either type
>>> info /serializer, if you want to go this way.
>>>
>>> Using a byte array would also work but doesn't look much easier than the
>>> Option approach to me.
>>>
>>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Yes, the intermediate dataset I create then join again between
>>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to add?
>>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
>>>> subsequent filter+map/flatMap deserialize only those elements I want to
>>>> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
>>>> dataset based.
>>>> Which one  do you think is the best solution?
>>>>
>>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> I did not completely understand which objects should go where, but
>>>>> here are some general guidelines:
>>>>>
>>>>> - early filtering is mostly a good idea (unless evaluating the filter
>>>>> expression is very expensive)
>>>>> - you can use a flatMap function to combine a map and a filter
>>>>> - applying multiple functions on the same data set does not
>>>>> necessarily materialize the data set (in memory or on disk). In most cases
>>>>> it prevents chaining, hence there is serialization overhead. In some cases
>>>>> where the forked data streams are joined again, the data set must be
>>>>> materialized in order to avoid deadlocks.
>>>>> - it is not possible to write a map that generates two different
>>>>> types, but you could implement a mapper that returns an Either<First,
>>>>> Second> type.
>>>>>
>>>>> Hope this helps,
>>>>> Fabian
>>>>>
>>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> Any help on this?
>>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi to all,
>>>>>>>
>>>>>>> in my program I have a Dataset that generated different types of
>>>>>>> object wrt the incoming element.
>>>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>>>> In order to type the different generated datasets I do something:
>>>>>>>
>>>>>>> Dataset<Tuple2> start =...
>>>>>>>
>>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>>>
>>>>>>> However this is very inefficient (I think because Flink needs to
>>>>>>> materialize the entire source dataset for every slot).
>>>>>>>
>>>>>>> It's much more efficient to group the generation of objects of the
>>>>>>> same type. E.g.:
>>>>>>>
>>>>>>> Dataset<Tuple2> start =..
>>>>>>>
>>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>>>
>>>>>>> Increasing the number of slots per task manager make things worse
>>>>>>> and worse :)
>>>>>>> Is there a way to improve this situation? Is it possible to write a
>>>>>>> "map" generating different type of object and then filter them by generated
>>>>>>> class type?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Dataset filter improvement

Posted by Flavio Pompermaier <po...@okkam.it>.
Because The classes are not related to each other. Do you think it's a good
idea to have something like this?

abstract class BaseClass(){
   String someField;
}

class ExtendedClass1 extends BaseClass (){
   String someOtherField11;
   String someOtherField12;
   String someOtherField13;
 ...
}

class ExtendedClass2 extends BaseClass (){
   Integer someOtherField21;
   Double someOtherField22;
   Integer someOtherField23;
 ...
}

and then declare my map as Map<Tuple2,BaseClass>. and then apply a flatMap
that can be used to generated the specific datasets?
Doesn't this cause problem to Flink? Classes can be vrry different to each
other..maybe this can cause problems with the plan generation..isn't it?

Thanks Fabian and Stephan for the support!


On Wed, Feb 10, 2016 at 11:47 AM, Stephan Ewen <se...@apache.org> wrote:

> Why not use an abstract base class and N subclasses?
>
> On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Unfortunately, there is no Either<1,...,n>.
>> You could implement something like a Tuple3<Option<Type1>, Option<Type2>,
>> Option<Type3>>. However, Flink does not provide an Option type (comes with
>> Java8). You would need to implement it yourself incl. TypeInfo and
>> Serializer. You can get some inspiration from the Either type info
>> /serializer, if you want to go this way.
>>
>> Using a byte array would also work but doesn't look much easier than the
>> Option approach to me.
>>
>> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Yes, the intermediate dataset I create then join again between
>>> themselves. What I'd need is a Either<1,...,n>. Is that possible to add?
>>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
>>> subsequent filter+map/flatMap deserialize only those elements I want to
>>> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
>>> dataset based.
>>> Which one  do you think is the best solution?
>>>
>>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I did not completely understand which objects should go where, but here
>>>> are some general guidelines:
>>>>
>>>> - early filtering is mostly a good idea (unless evaluating the filter
>>>> expression is very expensive)
>>>> - you can use a flatMap function to combine a map and a filter
>>>> - applying multiple functions on the same data set does not necessarily
>>>> materialize the data set (in memory or on disk). In most cases it prevents
>>>> chaining, hence there is serialization overhead. In some cases where the
>>>> forked data streams are joined again, the data set must be materialized in
>>>> order to avoid deadlocks.
>>>> - it is not possible to write a map that generates two different types,
>>>> but you could implement a mapper that returns an Either<First, Second> type.
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> Any help on this?
>>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it>
>>>>> wrote:
>>>>>
>>>>>> Hi to all,
>>>>>>
>>>>>> in my program I have a Dataset that generated different types of
>>>>>> object wrt the incoming element.
>>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>>> In order to type the different generated datasets I do something:
>>>>>>
>>>>>> Dataset<Tuple2> start =...
>>>>>>
>>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>>
>>>>>> However this is very inefficient (I think because Flink needs to
>>>>>> materialize the entire source dataset for every slot).
>>>>>>
>>>>>> It's much more efficient to group the generation of objects of the
>>>>>> same type. E.g.:
>>>>>>
>>>>>> Dataset<Tuple2> start =..
>>>>>>
>>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>>
>>>>>> Increasing the number of slots per task manager make things worse and
>>>>>> worse :)
>>>>>> Is there a way to improve this situation? Is it possible to write a
>>>>>> "map" generating different type of object and then filter them by generated
>>>>>> class type?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>>
>>
>

Re: Dataset filter improvement

Posted by Stephan Ewen <se...@apache.org>.
Why not use an abstract base class and N subclasses?

On Wed, Feb 10, 2016 at 10:05 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Unfortunately, there is no Either<1,...,n>.
> You could implement something like a Tuple3<Option<Type1>, Option<Type2>,
> Option<Type3>>. However, Flink does not provide an Option type (comes with
> Java8). You would need to implement it yourself incl. TypeInfo and
> Serializer. You can get some inspiration from the Either type info
> /serializer, if you want to go this way.
>
> Using a byte array would also work but doesn't look much easier than the
> Option approach to me.
>
> 2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Yes, the intermediate dataset I create then join again between
>> themselves. What I'd need is a Either<1,...,n>. Is that possible to add?
>> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
>> subsequent filter+map/flatMap deserialize only those elements I want to
>> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
>> dataset based.
>> Which one  do you think is the best solution?
>>
>> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> I did not completely understand which objects should go where, but here
>>> are some general guidelines:
>>>
>>> - early filtering is mostly a good idea (unless evaluating the filter
>>> expression is very expensive)
>>> - you can use a flatMap function to combine a map and a filter
>>> - applying multiple functions on the same data set does not necessarily
>>> materialize the data set (in memory or on disk). In most cases it prevents
>>> chaining, hence there is serialization overhead. In some cases where the
>>> forked data streams are joined again, the data set must be materialized in
>>> order to avoid deadlocks.
>>> - it is not possible to write a map that generates two different types,
>>> but you could implement a mapper that returns an Either<First, Second> type.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> Any help on this?
>>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>>
>>>>> in my program I have a Dataset that generated different types of
>>>>> object wrt the incoming element.
>>>>> Thus it's like a Map<Tuple2,Object>.
>>>>> In order to type the different generated datasets I do something:
>>>>>
>>>>> Dataset<Tuple2> start =...
>>>>>
>>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>>
>>>>> However this is very inefficient (I think because Flink needs to
>>>>> materialize the entire source dataset for every slot).
>>>>>
>>>>> It's much more efficient to group the generation of objects of the
>>>>> same type. E.g.:
>>>>>
>>>>> Dataset<Tuple2> start =..
>>>>>
>>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>>
>>>>> Increasing the number of slots per task manager make things worse and
>>>>> worse :)
>>>>> Is there a way to improve this situation? Is it possible to write a
>>>>> "map" generating different type of object and then filter them by generated
>>>>> class type?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>>
>

Re: Dataset filter improvement

Posted by Fabian Hueske <fh...@gmail.com>.
Unfortunately, there is no Either<1,...,n>.
You could implement something like a Tuple3<Option<Type1>, Option<Type2>,
Option<Type3>>. However, Flink does not provide an Option type (comes with
Java8). You would need to implement it yourself incl. TypeInfo and
Serializer. You can get some inspiration from the Either type info
/serializer, if you want to go this way.

Using a byte array would also work but doesn't look much easier than the
Option approach to me.

2016-02-10 9:47 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Yes, the intermediate dataset I create then join again between themselves.
> What I'd need is a Either<1,...,n>. Is that possible to add?
> Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
> subsequent filter+map/flatMap deserialize only those elements I want to
> group togheter (e.g. t.f0=="someEventType") in order to generate the typed
> dataset based.
> Which one  do you think is the best solution?
>
> On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> I did not completely understand which objects should go where, but here
>> are some general guidelines:
>>
>> - early filtering is mostly a good idea (unless evaluating the filter
>> expression is very expensive)
>> - you can use a flatMap function to combine a map and a filter
>> - applying multiple functions on the same data set does not necessarily
>> materialize the data set (in memory or on disk). In most cases it prevents
>> chaining, hence there is serialization overhead. In some cases where the
>> forked data streams are joined again, the data set must be materialized in
>> order to avoid deadlocks.
>> - it is not possible to write a map that generates two different types,
>> but you could implement a mapper that returns an Either<First, Second> type.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Any help on this?
>>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it> wrote:
>>>
>>>> Hi to all,
>>>>
>>>> in my program I have a Dataset that generated different types of object
>>>> wrt the incoming element.
>>>> Thus it's like a Map<Tuple2,Object>.
>>>> In order to type the different generated datasets I do something:
>>>>
>>>> Dataset<Tuple2> start =...
>>>>
>>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>>
>>>> However this is very inefficient (I think because Flink needs to
>>>> materialize the entire source dataset for every slot).
>>>>
>>>> It's much more efficient to group the generation of objects of the same
>>>> type. E.g.:
>>>>
>>>> Dataset<Tuple2> start =..
>>>>
>>>> Dataset<MyObj1> tmp1 = start.map(..);
>>>> Dataset<MyObj3> tmp2 = start.map(..);
>>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>>
>>>> Increasing the number of slots per task manager make things worse and
>>>> worse :)
>>>> Is there a way to improve this situation? Is it possible to write a
>>>> "map" generating different type of object and then filter them by generated
>>>> class type?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>
>

Re: Dataset filter improvement

Posted by Flavio Pompermaier <po...@okkam.it>.
Yes, the intermediate dataset I create then join again between themselves.
What I'd need is a Either<1,...,n>. Is that possible to add?
Otherwise I was thinking to generate a Tuple2<String,byte[]> and in the
subsequent filter+map/flatMap deserialize only those elements I want to
group togheter (e.g. t.f0=="someEventType") in order to generate the typed
dataset based.
Which one  do you think is the best solution?

On Wed, Feb 10, 2016 at 9:40 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Flavio,
>
> I did not completely understand which objects should go where, but here
> are some general guidelines:
>
> - early filtering is mostly a good idea (unless evaluating the filter
> expression is very expensive)
> - you can use a flatMap function to combine a map and a filter
> - applying multiple functions on the same data set does not necessarily
> materialize the data set (in memory or on disk). In most cases it prevents
> chaining, hence there is serialization overhead. In some cases where the
> forked data streams are joined again, the data set must be materialized in
> order to avoid deadlocks.
> - it is not possible to write a map that generates two different types,
> but you could implement a mapper that returns an Either<First, Second> type.
>
> Hope this helps,
> Fabian
>
> 2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Any help on this?
>> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it> wrote:
>>
>>> Hi to all,
>>>
>>> in my program I have a Dataset that generated different types of object
>>> wrt the incoming element.
>>> Thus it's like a Map<Tuple2,Object>.
>>> In order to type the different generated datasets I do something:
>>>
>>> Dataset<Tuple2> start =...
>>>
>>> Dataset<MyObj1> ds1 = start.filter().map(..);
>>> Dataset<MyObj1> ds2 = start.filter().map(..);
>>> Dataset<MyObj3> ds3 = start.filter().map(..);
>>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>>
>>> However this is very inefficient (I think because Flink needs to
>>> materialize the entire source dataset for every slot).
>>>
>>> It's much more efficient to group the generation of objects of the same
>>> type. E.g.:
>>>
>>> Dataset<Tuple2> start =..
>>>
>>> Dataset<MyObj1> tmp1 = start.map(..);
>>> Dataset<MyObj3> tmp2 = start.map(..);
>>> Dataset<MyObj1> ds1 = tmp1.filter();
>>> Dataset<MyObj1> ds2 = tmp1.filter();
>>> Dataset<MyObj3> ds3 = tmp2.filter();
>>> Dataset<MyObj3> ds4 = tmp2.filter();
>>>
>>> Increasing the number of slots per task manager make things worse and
>>> worse :)
>>> Is there a way to improve this situation? Is it possible to write a
>>> "map" generating different type of object and then filter them by generated
>>> class type?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>

Re: Dataset filter improvement

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

I did not completely understand which objects should go where, but here are
some general guidelines:

- early filtering is mostly a good idea (unless evaluating the filter
expression is very expensive)
- you can use a flatMap function to combine a map and a filter
- applying multiple functions on the same data set does not necessarily
materialize the data set (in memory or on disk). In most cases it prevents
chaining, hence there is serialization overhead. In some cases where the
forked data streams are joined again, the data set must be materialized in
order to avoid deadlocks.
- it is not possible to write a map that generates two different types, but
you could implement a mapper that returns an Either<First, Second> type.

Hope this helps,
Fabian

2016-02-10 8:43 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Any help on this?
> On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it> wrote:
>
>> Hi to all,
>>
>> in my program I have a Dataset that generated different types of object
>> wrt the incoming element.
>> Thus it's like a Map<Tuple2,Object>.
>> In order to type the different generated datasets I do something:
>>
>> Dataset<Tuple2> start =...
>>
>> Dataset<MyObj1> ds1 = start.filter().map(..);
>> Dataset<MyObj1> ds2 = start.filter().map(..);
>> Dataset<MyObj3> ds3 = start.filter().map(..);
>> Dataset<MyObj3> ds4 = start.filter().map(..);
>>
>> However this is very inefficient (I think because Flink needs to
>> materialize the entire source dataset for every slot).
>>
>> It's much more efficient to group the generation of objects of the same
>> type. E.g.:
>>
>> Dataset<Tuple2> start =..
>>
>> Dataset<MyObj1> tmp1 = start.map(..);
>> Dataset<MyObj3> tmp2 = start.map(..);
>> Dataset<MyObj1> ds1 = tmp1.filter();
>> Dataset<MyObj1> ds2 = tmp1.filter();
>> Dataset<MyObj3> ds3 = tmp2.filter();
>> Dataset<MyObj3> ds4 = tmp2.filter();
>>
>> Increasing the number of slots per task manager make things worse and
>> worse :)
>> Is there a way to improve this situation? Is it possible to write a "map"
>> generating different type of object and then filter them by generated class
>> type?
>>
>> Best,
>> Flavio
>>
>>
>>
>>
>>
>>
>>

Re: Dataset filter improvement

Posted by Flavio Pompermaier <po...@okkam.it>.
Any help on this?
On 9 Feb 2016 18:03, "Flavio Pompermaier" <po...@okkam.it> wrote:

> Hi to all,
>
> in my program I have a Dataset that generated different types of object
> wrt the incoming element.
> Thus it's like a Map<Tuple2,Object>.
> In order to type the different generated datasets I do something:
>
> Dataset<Tuple2> start =...
>
> Dataset<MyObj1> ds1 = start.filter().map(..);
> Dataset<MyObj1> ds2 = start.filter().map(..);
> Dataset<MyObj3> ds3 = start.filter().map(..);
> Dataset<MyObj3> ds4 = start.filter().map(..);
>
> However this is very inefficient (I think because Flink needs to
> materialize the entire source dataset for every slot).
>
> It's much more efficient to group the generation of objects of the same
> type. E.g.:
>
> Dataset<Tuple2> start =..
>
> Dataset<MyObj1> tmp1 = start.map(..);
> Dataset<MyObj3> tmp2 = start.map(..);
> Dataset<MyObj1> ds1 = tmp1.filter();
> Dataset<MyObj1> ds2 = tmp1.filter();
> Dataset<MyObj3> ds3 = tmp2.filter();
> Dataset<MyObj3> ds4 = tmp2.filter();
>
> Increasing the number of slots per task manager make things worse and
> worse :)
> Is there a way to improve this situation? Is it possible to write a "map"
> generating different type of object and then filter them by generated class
> type?
>
> Best,
> Flavio
>
>
>
>
>
>
>