You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matt <dr...@gmail.com> on 2016/12/22 21:04:46 UTC

Caching collected objects in .apply()

Hello,

I have a window processing 10 objects at a time, and creating 1 as a
result. The problem is in order to create that object I need the object
from the previous window.

I'm doing this:

stream
  .keyBy(...some key...)
  .countWindow(10, 1)
  .apply(...creates an element A...)
  .keyBy(...same key as above...)
  .countWindow(2, 1)
  .apply(...updates A with the value of the previous element A...)
  .addSink(...)

Probably there is a way to retrieve the last collected object inside the
first .apply(), or to cache it somehow.

Is there a better way to achieve the same? How inefficient is this?

Regards,
Matt

Re: Caching collected objects in .apply()

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think your approach with two window() operations is fine. There is no way
to retrieve the result from a previous window because it is not strictly
defined what the previous window is. Also, keeping data inside your user
functions (in fields) is problematic because these function instances are
reused to process elements for several different keys.

Cheers,
Aljoscha

On Thu, 5 Jan 2017 at 11:09 Fabian Hueske <fh...@gmail.com> wrote:

> Hi Matt,
>
> I think your approach should be fine.
> Although the second keyBy is logically a shuffle, the data will not be
> sent of the wire to a different machine if the parallelism of the first and
> second window operator are identical.
> It only cost one serialization / deserialization step.
>
> I would be careful about putting the result of the first window into
> operator state. I think it is not well defined how function objects are
> reused. This might be an internal implementation detail which might change
> in the future.
> Aljoscha (in CC) should know more about how the window function objects
> are used.
>
> Best, Fabian
>
> 2017-01-05 10:06 GMT+01:00 Matt <dr...@gmail.com>:
>
> I'm still looking for an answer to this question. Hope you can give me
> some insight!
>
> On Thu, Dec 22, 2016 at 6:17 PM, Matt <dr...@gmail.com> wrote:
>
> Just to be clear, the stream is of String elements. The first part of the
> pipeline (up to the first .apply) receives those strings, and returns
> objects of another class ("A" let's say).
>
> On Thu, Dec 22, 2016 at 6:04 PM, Matt <dr...@gmail.com> wrote:
>
> Hello,
>
> I have a window processing 10 objects at a time, and creating 1 as a
> result. The problem is in order to create that object I need the object
> from the previous window.
>
> I'm doing this:
>
> stream
>   .keyBy(...some key...)
>   .countWindow(10, 1)
>   .apply(...creates an element A...)
>   .keyBy(...same key as above...)
>   .countWindow(2, 1)
>   .apply(...updates A with the value of the previous element A...)
>   .addSink(...)
>
> Probably there is a way to retrieve the last collected object inside the
> first .apply(), or to cache it somehow.
>
> Is there a better way to achieve the same? How inefficient is this?
>
> Regards,
> Matt
>
>
>
>
>

Re: Caching collected objects in .apply()

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Matt,

I think your approach should be fine.
Although the second keyBy is logically a shuffle, the data will not be sent
of the wire to a different machine if the parallelism of the first and
second window operator are identical.
It only cost one serialization / deserialization step.

I would be careful about putting the result of the first window into
operator state. I think it is not well defined how function objects are
reused. This might be an internal implementation detail which might change
in the future.
Aljoscha (in CC) should know more about how the window function objects are
used.

Best, Fabian

2017-01-05 10:06 GMT+01:00 Matt <dr...@gmail.com>:

> I'm still looking for an answer to this question. Hope you can give me
> some insight!
>
> On Thu, Dec 22, 2016 at 6:17 PM, Matt <dr...@gmail.com> wrote:
>
>> Just to be clear, the stream is of String elements. The first part of the
>> pipeline (up to the first .apply) receives those strings, and returns
>> objects of another class ("A" let's say).
>>
>> On Thu, Dec 22, 2016 at 6:04 PM, Matt <dr...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a window processing 10 objects at a time, and creating 1 as a
>>> result. The problem is in order to create that object I need the object
>>> from the previous window.
>>>
>>> I'm doing this:
>>>
>>> stream
>>>   .keyBy(...some key...)
>>>   .countWindow(10, 1)
>>>   .apply(...creates an element A...)
>>>   .keyBy(...same key as above...)
>>>   .countWindow(2, 1)
>>>   .apply(...updates A with the value of the previous element A...)
>>>   .addSink(...)
>>>
>>> Probably there is a way to retrieve the last collected object inside the
>>> first .apply(), or to cache it somehow.
>>>
>>> Is there a better way to achieve the same? How inefficient is this?
>>>
>>> Regards,
>>> Matt
>>>
>>
>>
>

Re: Caching collected objects in .apply()

Posted by Matt <dr...@gmail.com>.
I'm still looking for an answer to this question. Hope you can give me some
insight!

On Thu, Dec 22, 2016 at 6:17 PM, Matt <dr...@gmail.com> wrote:

> Just to be clear, the stream is of String elements. The first part of the
> pipeline (up to the first .apply) receives those strings, and returns
> objects of another class ("A" let's say).
>
> On Thu, Dec 22, 2016 at 6:04 PM, Matt <dr...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a window processing 10 objects at a time, and creating 1 as a
>> result. The problem is in order to create that object I need the object
>> from the previous window.
>>
>> I'm doing this:
>>
>> stream
>>   .keyBy(...some key...)
>>   .countWindow(10, 1)
>>   .apply(...creates an element A...)
>>   .keyBy(...same key as above...)
>>   .countWindow(2, 1)
>>   .apply(...updates A with the value of the previous element A...)
>>   .addSink(...)
>>
>> Probably there is a way to retrieve the last collected object inside the
>> first .apply(), or to cache it somehow.
>>
>> Is there a better way to achieve the same? How inefficient is this?
>>
>> Regards,
>> Matt
>>
>
>

Re: Caching collected objects in .apply()

Posted by Matt <dr...@gmail.com>.
Just to be clear, the stream is of String elements. The first part of the
pipeline (up to the first .apply) receives those strings, and returns
objects of another class ("A" let's say).

On Thu, Dec 22, 2016 at 6:04 PM, Matt <dr...@gmail.com> wrote:

> Hello,
>
> I have a window processing 10 objects at a time, and creating 1 as a
> result. The problem is in order to create that object I need the object
> from the previous window.
>
> I'm doing this:
>
> stream
>   .keyBy(...some key...)
>   .countWindow(10, 1)
>   .apply(...creates an element A...)
>   .keyBy(...same key as above...)
>   .countWindow(2, 1)
>   .apply(...updates A with the value of the previous element A...)
>   .addSink(...)
>
> Probably there is a way to retrieve the last collected object inside the
> first .apply(), or to cache it somehow.
>
> Is there a better way to achieve the same? How inefficient is this?
>
> Regards,
> Matt
>