You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Guagliardo, Patrizio via user" <us...@beam.apache.org> on 2023/04/24 15:22:44 UTC

[Question] - Time series - cumulative sum in right order with python api in a batch process

Hi together,

I want to create a cumulative sum over a time series in a bounded batch processing in Apache beam with the Python API. What you can do is to write a cummulative sum with a stateful DoFn, but the problem you would still face is that you cannot handle it this way when the data in unordered, which is the case in a PCollection. Is there a way to make the cumulative sum over time in a batch process? This is what i did (whithout order):
import apache_beam as beam
from apache_beam import TimeDomain
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, TimerSpec, CombiningValueStateSpec
from apache_beam.transforms.window import FixedWindows, GlobalWindows


class TimestampedSumAccumulator(beam.DoFn):
    SUM_STATE = 'sum'

    def process(
        self, element,
        timestamp=beam.DoFn.TimestampParam,
        sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, beam.coders.FloatCoder()))
    ):
        sum_value = sum_state.read() or 0.0
        # print(element)
        sum_value += element[1]
        sum_state.write(sum_value)
        yield beam.transforms.window.TimestampedValue(sum_value, timestamp)


with beam.Pipeline() as p:
    sums = (p
        | 'Create' >> beam.Create([
            (3.1, 3),
            (1.5, 1),
            (4.2, 4),
            (5.4, 5),
            (2.3, 2)
        ])
        | 'AddTimestamps' >> beam.Map(lambda x: beam.transforms.window.TimestampedValue(x[0], x[1]))
        | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
        | 'Window' >> beam.WindowInto(FixedWindows(10))
        | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
        | 'Print' >> beam.Map(print))

How could that be done to make the cumulative sum in the “right” order?

Thank you very much in advance.


________________________________
This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.

Re: [Question] - Time series - cumulative sum in right order with python api in a batch process

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

there is (rather old and long) discussion of this for Java SDK in [1]. 
This discussion resulted in adding @RequiresTimeSortedInput annotation 
[2]. Unfortunately this probably has not been transferred to Python SDK.

I'll sum up reasons why it was added:

  a) inputs to stateful DoFn are naturally unsorted

  b) batch Pipelines have two options how to feed data to stateful DoFn:

   b1) unsorted, feed as data arrive

   b2) explicitly sort by timestamp (or correlated field in data, e.g. 
sequential index, if provided)

In case b1) there is no way to move watermark before *all* data is read 
from input - moving watermark might produce late data with arbitrary 
lateness (that would be consistency bug).

In case b2) it would be possible to advance watermark (and thus fire 
event-time timers).

The case b2) was decided to be too restrictive to be added to the model 
as a requirement for batch pipelines - which is totally reasonable. We 
are therefore left with b1), which means that any requirement of 
use-case like yours requires first reading the complete batch data to a 
state, then manually sorting and only then processing the ordered data. 
This requires a lot of coding (that could be wrapped into a reusable 
PTransform, for sure), but it is also inefficient, because pure batch 
runner it likely to perform merge-sort grouping anyway. It only needs to 
know that it should keep this guarantee for the DoFn (and add timestamp 
to the sorting key). This is the reason why the annotation was 
introduced - to keep the Beam model as flexible as possible while 
enabling runners to make use of sorting they already do anyway, in case 
it is needed by the application logic.

I think that until an equivalent information is provided to a DoFn in 
Python SDK, the only option is buffering and manual sorting of the 
complete data set (broken per key).

  Jan

[1] https://lists.apache.org/thread/7ryqg3bm1c3bs7g1nk4krnrjxlkd7srn
[2] 
https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

On 4/24/23 18:35, Guagliardo, Patrizio via user wrote:
> Ok great, so what I did at the end was:
>
> def cumulative_sums(key, timestamped_values):
>    running = 0
>    for x in sorted(timestamped_values, key=lambda x: x[1]):
>      running += x[0]
>      yield key, running
>
>
> with beam.Pipeline() as p:
>      sums = (p
>          | 'Create' >> beam.Create([
>              (3.1, 3),
>              (4.2, 4),
>              (5.4, 5),
>              (2.3, 2),
>              (1.5, 6)
>          ])
>          | 'AddTimestamps' >> beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x[1]))
>          | beam.Map(lambda x: ('key', x))
>          | 'Window' >> beam.WindowInto(FixedWindows(11))
>          | beam.GroupByKey()
>          | beam.FlatMapTuple(cumulative_sums)
>          | 'Print' >> beam.Map(print))
>
> However I am asking if there is a way to take a state from one window to another. I am asking this as I would like to do also other transformations where for example you take one value from one time step to the next for whatever reason: calculate timediff, fill in missing value (taken the value from time step before), etc etc...Can that be done? I have read something about looping timers, but could no get the details in Python. Is there a manner to do this?
>
> Thanks a lot.
>
> -----Original Message-----
> From: Robert Bradshaw <ro...@google.com>
> Sent: lunes, 24 de abril de 2023 18:00
> To: user@beam.apache.org; Guagliardo, Patrizio <Pa...@oliverwyman.com>
> Subject: Re: [Question] - Time series - cumulative sum in right order with python api in a batch process
>
>
> CAUTION: This email originated outside the company. Do not click links or open attachments unless you are expecting them from the sender.
>
>
>
> You are correct in that the data may arrive in an unordered way.
> However, once a window finishes, you are guaranteed to have seen all the data up to that point (modulo late data) and can then confidently compute your ordered cumulative sum.
>
> You could do something like this:
>
> def cumulative_sums(key, timestamped_values):
>    running = 0
>    for _, x in sorted(timestamped_values):
>      yield x
>
> sums = (timestamped_data
>    | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
>    | beam.WindowInto(...)
>    | beam.GroupByKey()
>    | beam.FlatMapTuple(cumulative_sums))
>
>
>
> On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user <us...@beam.apache.org> wrote:
>> Hi together,
>>
>>
>>
>> I want to create a cumulative sum over a time series in a bounded batch processing in Apache beam with the Python API. What you can do is to write a cummulative sum with a stateful DoFn, but the problem you would still face is that you cannot handle it this way when the data in unordered, which is the case in a PCollection. Is there a way to make the cumulative sum over time in a batch process? This is what i did (whithout order):
>>
>> import apache_beam as beam
>>
>> from apache_beam import TimeDomain
>>
>> from apache_beam.transforms.userstate import ReadModifyWriteStateSpec,
>> TimerSpec, CombiningValueStateSpec
>>
>> from apache_beam.transforms.window import FixedWindows, GlobalWindows
>>
>>
>>
>>
>>
>> class TimestampedSumAccumulator(beam.DoFn):
>>
>>      SUM_STATE = 'sum'
>>
>>
>>
>>      def process(
>>
>>          self, element,
>>
>>          timestamp=beam.DoFn.TimestampParam,
>>
>>
>> sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE,
>> beam.coders.FloatCoder()))
>>
>>      ):
>>
>>          sum_value = sum_state.read() or 0.0
>>
>>          # print(element)
>>
>>          sum_value += element[1]
>>
>>          sum_state.write(sum_value)
>>
>>          yield beam.transforms.window.TimestampedValue(sum_value,
>> timestamp)
>>
>>
>>
>>
>>
>> with beam.Pipeline() as p:
>>
>>      sums = (p
>>
>>          | 'Create' >> beam.Create([
>>
>>              (3.1, 3),
>>
>>              (1.5, 1),
>>
>>              (4.2, 4),
>>
>>              (5.4, 5),
>>
>>              (2.3, 2)
>>
>>          ])
>>
>>          | 'AddTimestamps' >> beam.Map(lambda x:
>> beam.transforms.window.TimestampedValue(x[0], x[1]))
>>
>>          | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
>>
>>          | 'Window' >> beam.WindowInto(FixedWindows(10))
>>
>>          | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
>>
>>          | 'Print' >> beam.Map(print))
>>
>>
>>
>> How could that be done to make the cumulative sum in the “right” order?
>>
>>
>>
>> Thank you very much in advance.
>>
>>
>>
>>
>> ________________________________
>> This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice.
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.

RE: [Question] - Time series - cumulative sum in right order with python api in a batch process

Posted by "Guagliardo, Patrizio via user" <us...@beam.apache.org>.
Ok great, so what I did at the end was:

def cumulative_sums(key, timestamped_values):
  running = 0
  for x in sorted(timestamped_values, key=lambda x: x[1]):
    running += x[0]
    yield key, running


with beam.Pipeline() as p:
    sums = (p
        | 'Create' >> beam.Create([
            (3.1, 3),
            (4.2, 4),
            (5.4, 5),
            (2.3, 2),
            (1.5, 6)
        ])
        | 'AddTimestamps' >> beam.Map(lambda x: beam.transforms.window.TimestampedValue(x, x[1]))
        | beam.Map(lambda x: ('key', x))
        | 'Window' >> beam.WindowInto(FixedWindows(11))
        | beam.GroupByKey()
        | beam.FlatMapTuple(cumulative_sums)
        | 'Print' >> beam.Map(print))

However I am asking if there is a way to take a state from one window to another. I am asking this as I would like to do also other transformations where for example you take one value from one time step to the next for whatever reason: calculate timediff, fill in missing value (taken the value from time step before), etc etc...Can that be done? I have read something about looping timers, but could no get the details in Python. Is there a manner to do this?

Thanks a lot.

-----Original Message-----
From: Robert Bradshaw <ro...@google.com>
Sent: lunes, 24 de abril de 2023 18:00
To: user@beam.apache.org; Guagliardo, Patrizio <Pa...@oliverwyman.com>
Subject: Re: [Question] - Time series - cumulative sum in right order with python api in a batch process


CAUTION: This email originated outside the company. Do not click links or open attachments unless you are expecting them from the sender.



You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all the data up to that point (modulo late data) and can then confidently compute your ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
  running = 0
  for _, x in sorted(timestamped_values):
    yield x

sums = (timestamped_data
  | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
  | beam.WindowInto(...)
  | beam.GroupByKey()
  | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user <us...@beam.apache.org> wrote:
>
> Hi together,
>
>
>
> I want to create a cumulative sum over a time series in a bounded batch processing in Apache beam with the Python API. What you can do is to write a cummulative sum with a stateful DoFn, but the problem you would still face is that you cannot handle it this way when the data in unordered, which is the case in a PCollection. Is there a way to make the cumulative sum over time in a batch process? This is what i did (whithout order):
>
> import apache_beam as beam
>
> from apache_beam import TimeDomain
>
> from apache_beam.transforms.userstate import ReadModifyWriteStateSpec,
> TimerSpec, CombiningValueStateSpec
>
> from apache_beam.transforms.window import FixedWindows, GlobalWindows
>
>
>
>
>
> class TimestampedSumAccumulator(beam.DoFn):
>
>     SUM_STATE = 'sum'
>
>
>
>     def process(
>
>         self, element,
>
>         timestamp=beam.DoFn.TimestampParam,
>
>
> sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE,
> beam.coders.FloatCoder()))
>
>     ):
>
>         sum_value = sum_state.read() or 0.0
>
>         # print(element)
>
>         sum_value += element[1]
>
>         sum_state.write(sum_value)
>
>         yield beam.transforms.window.TimestampedValue(sum_value,
> timestamp)
>
>
>
>
>
> with beam.Pipeline() as p:
>
>     sums = (p
>
>         | 'Create' >> beam.Create([
>
>             (3.1, 3),
>
>             (1.5, 1),
>
>             (4.2, 4),
>
>             (5.4, 5),
>
>             (2.3, 2)
>
>         ])
>
>         | 'AddTimestamps' >> beam.Map(lambda x:
> beam.transforms.window.TimestampedValue(x[0], x[1]))
>
>         | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
>
>         | 'Window' >> beam.WindowInto(FixedWindows(10))
>
>         | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
>
>         | 'Print' >> beam.Map(print))
>
>
>
> How could that be done to make the cumulative sum in the “right” order?
>
>
>
> Thank you very much in advance.
>
>
>
>
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice.

________________________________
This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.

Re: [Question] - Time series - cumulative sum in right order with python api in a batch process

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all
the data up to that point (modulo late data) and can then confidently
compute your ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
  running = 0
  for _, x in sorted(timestamped_values):
    yield x

sums = (timestamped_data
  | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
  | beam.WindowInto(...)
  | beam.GroupByKey()
  | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user
<us...@beam.apache.org> wrote:
>
> Hi together,
>
>
>
> I want to create a cumulative sum over a time series in a bounded batch processing in Apache beam with the Python API. What you can do is to write a cummulative sum with a stateful DoFn, but the problem you would still face is that you cannot handle it this way when the data in unordered, which is the case in a PCollection. Is there a way to make the cumulative sum over time in a batch process? This is what i did (whithout order):
>
> import apache_beam as beam
>
> from apache_beam import TimeDomain
>
> from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, TimerSpec, CombiningValueStateSpec
>
> from apache_beam.transforms.window import FixedWindows, GlobalWindows
>
>
>
>
>
> class TimestampedSumAccumulator(beam.DoFn):
>
>     SUM_STATE = 'sum'
>
>
>
>     def process(
>
>         self, element,
>
>         timestamp=beam.DoFn.TimestampParam,
>
>         sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, beam.coders.FloatCoder()))
>
>     ):
>
>         sum_value = sum_state.read() or 0.0
>
>         # print(element)
>
>         sum_value += element[1]
>
>         sum_state.write(sum_value)
>
>         yield beam.transforms.window.TimestampedValue(sum_value, timestamp)
>
>
>
>
>
> with beam.Pipeline() as p:
>
>     sums = (p
>
>         | 'Create' >> beam.Create([
>
>             (3.1, 3),
>
>             (1.5, 1),
>
>             (4.2, 4),
>
>             (5.4, 5),
>
>             (2.3, 2)
>
>         ])
>
>         | 'AddTimestamps' >> beam.Map(lambda x: beam.transforms.window.TimestampedValue(x[0], x[1]))
>
>         | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
>
>         | 'Window' >> beam.WindowInto(FixedWindows(10))
>
>         | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
>
>         | 'Print' >> beam.Map(print))
>
>
>
> How could that be done to make the cumulative sum in the “right” order?
>
>
>
> Thank you very much in advance.
>
>
>
>
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice.