You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by bi...@tutanota.com on 2017/05/01 16:13:04 UTC

Efficiency question

I have been trying to figure out the potential efficiency of sliding windows. Looking at the TrafficRoutes example - https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java -  it seems that the GatherStats class explicitly sorts its data (in event-time order) within every window for every key. (Collections.sort(infoList)). 
Is this necessary? If the data for each key arrives in event-time order and that order is maintained as the data flows through the pipeline, then the data within each window should already be sorted. For large sliding windows with small lags/sliding offsets re-sorting is going to be very inefficient. Or is it the case in Beam/DataFlow that even if the underlying data stream is ordered, there are no guarantees to the ordering of the data after a window transform or GroupByKey has been applied? 
Thanks,
Bill.

Re: Efficiency question

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, May 1, 2017 at 10:46 AM,  <bi...@tutanota.com> wrote:
> Yes, I understand there's no explicit underlying time-ordering within the
> stream. What I am getting at is that the notion of windowing in Beam and
> Dataflow does rely on there being at least an implicit weak ordering within
> the  stream (without it, you could never issue a watermark - essentially the
> assumption of  "useful" watermarks can be treated as the definition of
> weak-ordering).

Yep.

> Often that weak ordering is in fact strong ordering in
> practice, yet we can't exploit it in that case.
>
> I am not sure maintaining a distributed total order would be more costly or
> necessary. For example you could distribute over key and then only require
> total order on a per-key basis.

Sometimes one has a total ordering (e.g. within a shard of a source)
but at each group-by-key one would have to re-establish this ordering
(as the upstream, formerly distributed elements were likely ordered
within different keys, on workers progressing at different rates).

Runners that can cheaply provide (or recognize) this property should
exploit it for their grouping. We don't have an API to expose this to
consumers of the GBK though.

> Anyway, thanks for the clarification - very helpful.

Anytime :)

> 1. May 2017 13:18 by robertwb@google.com:
>
>
> On Mon, May 1, 2017 at 9:53 AM, <bi...@tutanota.com> wrote:
>
>
> Thanks Thomas. That's good to know :) Are there any plans to support ordered
> sources?
>
>
> It's been discussed, but there are no concrete plans.
>
> It seems odd (to me at least) to have a stream-oriented
> computational model with support for grouping by time (windowing) and yet
> not provide hooks to exploit the same time-ordering within the stream.
>
>
> There is actually no underlying time-ordering within the stream. The
> elements are grouped by buffering elements as they come in and
> tracking a watermark (which is a signal that "all the data up to
> timestamp T has now been collected, see [1] for more details) to
> release completed groups downstream (depending on the triggering). A
> runner that actually guaranteed delivery time-ordered delivery of
> elements could provide this and group more efficiently, but that would
> likely impose a higher cost of maintaining a distributed total order.
>
> [1] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
>
> 1. May 2017 12:25 by tgroh@google.com:
>
>
> Within the Beam model, there is no guarantee about the ordering of any
> PCollection, nor the ordering of any Iterable produced by a GroupByKey, by
> element timestamps or any other comparator. Runners aren't required to
> maintain any ordering provided by a source, and do not require sources to
> provide any ordering. As such, if you want to process data in sorted order,
> currently the only option is to explicitly sort the data.
>
> On Mon, May 1, 2017 at 9:13 AM, <bi...@tutanota.com> wrote:
>
>
> I have been trying to figure out the potential efficiency of sliding
> windows. Looking at the TrafficRoutes example -
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
> - it seems that the GatherStats class explicitly sorts its data (in
> event-time order) within every window for every key.
> (Collections.sort(infoList)).
>
> Is this necessary? If the data for each key arrives in event-time order
> and that order is maintained as the data flows through the pipeline, then
> the data within each window should already be sorted. For large sliding
> windows with small lags/sliding offsets re-sorting is going to be very
> inefficient. Or is it the case in Beam/DataFlow that even if the underlying
> data stream is ordered, there are no guarantees to the ordering of the data
> after a window transform or GroupByKey has been applied?
>
> Thanks,
>
> Bill.
>
>

Re: Efficiency question

Posted by bi...@tutanota.com.
Yes, I understand there's no explicit underlying time-ordering within the stream. What I am getting at is that the notion of windowing in Beam and Dataflow does rely on there being at least an implicit weak ordering within the  stream (without it, you could never issue a watermark - essentially the assumption of  "useful" watermarks can be treated as the definition of weak-ordering). Often that weak ordering is in fact strong ordering in practice, yet we can't exploit it in that case.
I am not sure maintaining a distributed total order would be more costly or necessary. For example you could distribute over key and then only require total order on a per-key basis. Anyway, thanks for the clarification - very helpful.
--Bill


1. May 2017 13:18 by robertwb@google.com:


> On Mon, May 1, 2017 at 9:53 AM,  <> billsmith31415@tutanota.com> > wrote:
>>
>> Thanks Thomas. That's good to know :) Are there any plans to support ordered
>> sources?
>
> It's been discussed, but there are no concrete plans.
>
>> It seems odd (to me at least) to have a stream-oriented
>> computational model with support for grouping by time (windowing) and yet
>> not provide hooks to exploit the same time-ordering within the stream.
>
> There is actually no underlying time-ordering within the stream. The
> elements are grouped by buffering elements as they come in and
> tracking a watermark (which is a signal that "all the data up to
> timestamp T has now been collected, see [1] for more details) to
> release completed groups downstream (depending on the triggering). A
> runner that actually guaranteed delivery time-ordered delivery of
> elements could provide this and group more efficiently, but that would
> likely impose a higher cost of maintaining a distributed total order.
>
> [1] > https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
>
>> 1. May 2017 12:25 by >> tgroh@google.com>> :
>>
>>
>> Within the Beam model, there is no guarantee about the ordering of any
>> PCollection, nor the ordering of any Iterable produced by a GroupByKey, by
>> element timestamps or any other comparator. Runners aren't required to
>> maintain any ordering provided by a source, and do not require sources to
>> provide any ordering. As such, if you want to process data in sorted order,
>> currently the only option is to explicitly sort the data.
>>
>> On Mon, May 1, 2017 at 9:13 AM, <>> billsmith31415@tutanota.com>> > wrote:
>>>
>>> I have been trying to figure out the potential efficiency of sliding
>>> windows. Looking at the TrafficRoutes example -
>>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
>>> -  it seems that the GatherStats class explicitly sorts its data (in
>>> event-time order) within every window for every key.
>>> (Collections.sort(infoList)).
>>>
>>> Is this necessary? If the data for each key arrives in event-time order
>>> and that order is maintained as the data flows through the pipeline, then
>>> the data within each window should already be sorted. For large sliding
>>> windows with small lags/sliding offsets re-sorting is going to be very
>>> inefficient. Or is it the case in Beam/DataFlow that even if the underlying
>>> data stream is ordered, there are no guarantees to the ordering of the data
>>> after a window transform or GroupByKey has been applied?
>>>
>>> Thanks,
>>>
>>> Bill.
>>
>>

Re: Efficiency question

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, May 1, 2017 at 9:53 AM,  <bi...@tutanota.com> wrote:
>
> Thanks Thomas. That's good to know :) Are there any plans to support ordered
> sources?

It's been discussed, but there are no concrete plans.

> It seems odd (to me at least) to have a stream-oriented
> computational model with support for grouping by time (windowing) and yet
> not provide hooks to exploit the same time-ordering within the stream.

There is actually no underlying time-ordering within the stream. The
elements are grouped by buffering elements as they come in and
tracking a watermark (which is a signal that "all the data up to
timestamp T has now been collected, see [1] for more details) to
release completed groups downstream (depending on the triggering). A
runner that actually guaranteed delivery time-ordered delivery of
elements could provide this and group more efficiently, but that would
likely impose a higher cost of maintaining a distributed total order.

[1] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

> 1. May 2017 12:25 by tgroh@google.com:
>
>
> Within the Beam model, there is no guarantee about the ordering of any
> PCollection, nor the ordering of any Iterable produced by a GroupByKey, by
> element timestamps or any other comparator. Runners aren't required to
> maintain any ordering provided by a source, and do not require sources to
> provide any ordering. As such, if you want to process data in sorted order,
> currently the only option is to explicitly sort the data.
>
> On Mon, May 1, 2017 at 9:13 AM, <bi...@tutanota.com> wrote:
>>
>> I have been trying to figure out the potential efficiency of sliding
>> windows. Looking at the TrafficRoutes example -
>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java
>> -  it seems that the GatherStats class explicitly sorts its data (in
>> event-time order) within every window for every key.
>> (Collections.sort(infoList)).
>>
>> Is this necessary? If the data for each key arrives in event-time order
>> and that order is maintained as the data flows through the pipeline, then
>> the data within each window should already be sorted. For large sliding
>> windows with small lags/sliding offsets re-sorting is going to be very
>> inefficient. Or is it the case in Beam/DataFlow that even if the underlying
>> data stream is ordered, there are no guarantees to the ordering of the data
>> after a window transform or GroupByKey has been applied?
>>
>> Thanks,
>>
>> Bill.
>
>

Re: Efficiency question

Posted by bi...@tutanota.com.
Thanks Thomas. That's good to know :) Are there any plans to support ordered sources? It seems odd (to me at least) to have a stream-oriented computational model with support for grouping by time (windowing) and yet not provide hooks to exploit the same time-ordering within the stream.

1. May 2017 12:25 by tgroh@google.com:


> Within the Beam model, there is no guarantee about the ordering of any PCollection, nor the ordering of any Iterable produced by a GroupByKey, by element timestamps or any other comparator. Runners aren't required to maintain any ordering provided by a source, and do not require sources to provide any ordering. As such, if you want to process data in sorted order, currently the only option is to explicitly sort the data.
> On Mon, May 1, 2017 at 9:13 AM,  <> billsmith31415@tutanota.com> > wrote:
>
>>           >> I have been trying to figure out the potential efficiency of sliding windows. Looking at the TrafficRoutes example - >> https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java/com/google/cloud/dataflow/examples/complete/TrafficRoutes.java>>  -  it seems that the GatherStats class explicitly sorts its data (in event-time order) within every window for every key. (Collections.sort(infoList)). 
>> Is this necessary? If the data for each key arrives in event-time order and that order is maintained as the data flows through the pipeline, then the data within each window should already be sorted. For large sliding windows with small lags/sliding offsets re-sorting is going to be very inefficient. Or is it the case in Beam/DataFlow that even if the underlying data stream is ordered, there are no guarantees to the ordering of the data after a window transform or GroupByKey has been applied? 
>> Thanks,
>> Bill.>>   
>
>

Re: Efficiency question

Posted by Thomas Groh <tg...@google.com>.
Within the Beam model, there is no guarantee about the ordering of any
PCollection, nor the ordering of any Iterable produced by a GroupByKey, by
element timestamps or any other comparator. Runners aren't required to
maintain any ordering provided by a source, and do not require sources to
provide any ordering. As such, if you want to process data in sorted order,
currently the only option is to explicitly sort the data.

On Mon, May 1, 2017 at 9:13 AM, <bi...@tutanota.com> wrote:

> I have been trying to figure out the potential efficiency of sliding
> windows. Looking at the TrafficRoutes example - https://github.com/
> GoogleCloudPlatform/DataflowJavaSDK-examples/blob/
> master/src/main/java/com/google/cloud/dataflow/examples/complete/
> TrafficRoutes.java -  it seems that the GatherStats class explicitly
> sorts its data (in event-time order) within every window for every key.
> (Collections.sort(infoList)).
>
> Is this necessary? If the data for each key arrives in event-time order
> and that order is maintained as the data flows through the pipeline, then
> the data within each window should already be sorted. For large sliding
> windows with small lags/sliding offsets re-sorting is going to be very
> inefficient. Or is it the case in Beam/DataFlow that even if the underlying
> data stream is ordered, there are no guarantees to the ordering of the data
> after a window transform or GroupByKey has been applied?
>
> Thanks,
>
> Bill.
>