You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ruoyun Huang <ru...@google.com> on 2019/11/12 01:34:14 UTC

Behavior of TimestampCombiner?

Hi, Folks,

    I am trying to understand the behavior of TimestampCombiner. I have a
test like this:


   1. class TimestampCombinerTest(unittest.TestCase):
   2.
   3.   def test_combiner_latest(self):
   4.     """Test TimestampCombiner with LATEST."""
   5.     options = PipelineOptions()
   6.     options.view_as(StandardOptions).streaming = True
   7.     p = TestPipeline(options=options)
   8.
   9.     main_stream = (p
   10.                    | 'main TestStream' >> TestStream()
   11.                    .add_elements([window.TimestampedValue(('k',
100), 0)])
   12.                    .add_elements([window.TimestampedValue(('k',
400), 9)])
   13.                    .advance_watermark_to_infinity()
   14.                    | 'main windowInto' >> beam.WindowInto(
   15.                       window.FixedWindows(10),
   16.
timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
   17.                    | 'Combine' >> beam.CombinePerKey(sum))
   18.
   19.     class RecordFn(beam.DoFn):
   20.       def process(self,
   21.                   elm=beam.DoFn.ElementParam,
   22.                   ts=beam.DoFn.TimestampParam):
   23.         yield (elm, ts)
   24.
   25.     records = (main_stream | beam.ParDo(RecordFn()))
   26.
   27.     expected_window_to_elements = {
   28.         window.IntervalWindow(0, 10): [
   29.             (('k', 500),  Timestamp(9)),
   30.         ],
   31.     }
   32.
   33.     assert_that(
   34.         records,
   35.         equal_to_per_window(expected_window_to_elements),
   36.         use_global_window=False,
   37.         label='assert per window')
   38.
   39.     p.run()


I expect the result to be following (based on various TimestampCombiner
strategy):
LATEST:    (('k', 500), Timestamp(9)),
EARLIEST:    (('k', 500), Timestamp(0)),
END_OF_WINDOW: (('k', 500), Timestamp(10)),
The above outcome is partially confirmed by Java side test : [1]

However, from beam python, the outcome is like this:
LATEST:    (('k', 500), Timestamp(10)),
EARLIEST:    (('k', 500), Timestamp(10)),
END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),

What did I miss? what should be the right expected behavior? or this looks
like a bug?

[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390

Cheers,

Re: Behavior of TimestampCombiner?

Posted by Robert Bradshaw <ro...@google.com>.
I bet, as with the previous one, this is due to over-eager combiner lifting.

On Tue, Nov 12, 2019 at 4:17 PM Ruoyun Huang <ru...@google.com> wrote:
>
> Reported a tracking JIRA:  https://issues.apache.org/jira/browse/BEAM-8645
>
> On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <ru...@google.com> wrote:
>>
>> Thanks for confirming.
>>
>> Since it is unexpected behavior, I shall look into jira if it is already on radar, if not, will create one.
>>
>> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
>>> results for LATEST and EARLIEST should be 9 and 0 respectively.
>>>
>>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
>>> >
>>> > Hi, Folks,
>>> >
>>> >     I am trying to understand the behavior of TimestampCombiner. I have a test like this:
>>> >
>>> > class TimestampCombinerTest(unittest.TestCase):
>>> >
>>> >   def test_combiner_latest(self):
>>> >     """Test TimestampCombiner with LATEST."""
>>> >     options = PipelineOptions()
>>> >     options.view_as(StandardOptions).streaming = True
>>> >     p = TestPipeline(options=options)
>>> >
>>> >     main_stream = (p
>>> >                    | 'main TestStream' >> TestStream()
>>> >                    .add_elements([window.TimestampedValue(('k', 100), 0)])
>>> >                    .add_elements([window.TimestampedValue(('k', 400), 9)])
>>> >                    .advance_watermark_to_infinity()
>>> >                    | 'main windowInto' >> beam.WindowInto(
>>> >                       window.FixedWindows(10),
>>> >                       timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>>> >                    | 'Combine' >> beam.CombinePerKey(sum))
>>> >
>>> >     class RecordFn(beam.DoFn):
>>> >       def process(self,
>>> >                   elm=beam.DoFn.ElementParam,
>>> >                   ts=beam.DoFn.TimestampParam):
>>> >         yield (elm, ts)
>>> >
>>> >     records = (main_stream | beam.ParDo(RecordFn()))
>>> >
>>> >     expected_window_to_elements = {
>>> >         window.IntervalWindow(0, 10): [
>>> >             (('k', 500),  Timestamp(9)),
>>> >         ],
>>> >     }
>>> >
>>> >     assert_that(
>>> >         records,
>>> >         equal_to_per_window(expected_window_to_elements),
>>> >         use_global_window=False,
>>> >         label='assert per window')
>>> >
>>> >     p.run()
>>> >
>>> >
>>> > I expect the result to be following (based on various TimestampCombiner strategy):
>>> > LATEST:    (('k', 500), Timestamp(9)),
>>> > EARLIEST:    (('k', 500), Timestamp(0)),
>>> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
>>> >
>>> > The above outcome is partially confirmed by Java side test : [1]
>>> >
>>> >
>>> > However, from beam python, the outcome is like this:
>>> > LATEST:    (('k', 500), Timestamp(10)),
>>> > EARLIEST:    (('k', 500), Timestamp(10)),
>>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>>> >
>>> > What did I miss? what should be the right expected behavior? or this looks like a bug?
>>> >
>>> > [1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>>> >
>>> > Cheers,
>>> >
>>
>>
>>
>> --
>> ================
>> Ruoyun  Huang
>>
>
>
> --
> ================
> Ruoyun  Huang
>

Re: Behavior of TimestampCombiner?

Posted by Ruoyun Huang <ru...@google.com>.
Reported a tracking JIRA:  https://issues.apache.org/jira/browse/BEAM-8645

On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <ru...@google.com> wrote:

> Thanks for confirming.
>
> Since it is unexpected behavior, I shall look into jira if it is already
> on radar, if not, will create one.
>
> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
>> results for LATEST and EARLIEST should be 9 and 0 respectively.
>>
>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
>> >
>> > Hi, Folks,
>> >
>> >     I am trying to understand the behavior of TimestampCombiner. I have
>> a test like this:
>> >
>> > class TimestampCombinerTest(unittest.TestCase):
>> >
>> >   def test_combiner_latest(self):
>> >     """Test TimestampCombiner with LATEST."""
>> >     options = PipelineOptions()
>> >     options.view_as(StandardOptions).streaming = True
>> >     p = TestPipeline(options=options)
>> >
>> >     main_stream = (p
>> >                    | 'main TestStream' >> TestStream()
>> >                    .add_elements([window.TimestampedValue(('k', 100),
>> 0)])
>> >                    .add_elements([window.TimestampedValue(('k', 400),
>> 9)])
>> >                    .advance_watermark_to_infinity()
>> >                    | 'main windowInto' >> beam.WindowInto(
>> >                       window.FixedWindows(10),
>> >
>>  timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>> >                    | 'Combine' >> beam.CombinePerKey(sum))
>> >
>> >     class RecordFn(beam.DoFn):
>> >       def process(self,
>> >                   elm=beam.DoFn.ElementParam,
>> >                   ts=beam.DoFn.TimestampParam):
>> >         yield (elm, ts)
>> >
>> >     records = (main_stream | beam.ParDo(RecordFn()))
>> >
>> >     expected_window_to_elements = {
>> >         window.IntervalWindow(0, 10): [
>> >             (('k', 500),  Timestamp(9)),
>> >         ],
>> >     }
>> >
>> >     assert_that(
>> >         records,
>> >         equal_to_per_window(expected_window_to_elements),
>> >         use_global_window=False,
>> >         label='assert per window')
>> >
>> >     p.run()
>> >
>> >
>> > I expect the result to be following (based on various TimestampCombiner
>> strategy):
>> > LATEST:    (('k', 500), Timestamp(9)),
>> > EARLIEST:    (('k', 500), Timestamp(0)),
>> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
>> >
>> > The above outcome is partially confirmed by Java side test : [1]
>> >
>> >
>> > However, from beam python, the outcome is like this:
>> > LATEST:    (('k', 500), Timestamp(10)),
>> > EARLIEST:    (('k', 500), Timestamp(10)),
>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>> >
>> > What did I miss? what should be the right expected behavior? or this
>> looks like a bug?
>> >
>> > [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>> >
>> > Cheers,
>> >
>>
>
>
> --
> ================
> Ruoyun  Huang
>
>

-- 
================
Ruoyun  Huang

Re: Behavior of TimestampCombiner?

Posted by Ruoyun Huang <ru...@google.com>.
Thanks for confirming.

Since it is unexpected behavior, I shall look into jira if it is already on
radar, if not, will create one.

On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <ro...@google.com> wrote:

> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
> results for LATEST and EARLIEST should be 9 and 0 respectively.
>
> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
> >
> > Hi, Folks,
> >
> >     I am trying to understand the behavior of TimestampCombiner. I have
> a test like this:
> >
> > class TimestampCombinerTest(unittest.TestCase):
> >
> >   def test_combiner_latest(self):
> >     """Test TimestampCombiner with LATEST."""
> >     options = PipelineOptions()
> >     options.view_as(StandardOptions).streaming = True
> >     p = TestPipeline(options=options)
> >
> >     main_stream = (p
> >                    | 'main TestStream' >> TestStream()
> >                    .add_elements([window.TimestampedValue(('k', 100),
> 0)])
> >                    .add_elements([window.TimestampedValue(('k', 400),
> 9)])
> >                    .advance_watermark_to_infinity()
> >                    | 'main windowInto' >> beam.WindowInto(
> >                       window.FixedWindows(10),
> >
>  timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
> >                    | 'Combine' >> beam.CombinePerKey(sum))
> >
> >     class RecordFn(beam.DoFn):
> >       def process(self,
> >                   elm=beam.DoFn.ElementParam,
> >                   ts=beam.DoFn.TimestampParam):
> >         yield (elm, ts)
> >
> >     records = (main_stream | beam.ParDo(RecordFn()))
> >
> >     expected_window_to_elements = {
> >         window.IntervalWindow(0, 10): [
> >             (('k', 500),  Timestamp(9)),
> >         ],
> >     }
> >
> >     assert_that(
> >         records,
> >         equal_to_per_window(expected_window_to_elements),
> >         use_global_window=False,
> >         label='assert per window')
> >
> >     p.run()
> >
> >
> > I expect the result to be following (based on various TimestampCombiner
> strategy):
> > LATEST:    (('k', 500), Timestamp(9)),
> > EARLIEST:    (('k', 500), Timestamp(0)),
> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
> >
> > The above outcome is partially confirmed by Java side test : [1]
> >
> >
> > However, from beam python, the outcome is like this:
> > LATEST:    (('k', 500), Timestamp(10)),
> > EARLIEST:    (('k', 500), Timestamp(10)),
> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
> >
> > What did I miss? what should be the right expected behavior? or this
> looks like a bug?
> >
> > [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
> >
> > Cheers,
> >
>


-- 
================
Ruoyun  Huang

Re: Behavior of TimestampCombiner?

Posted by Robert Bradshaw <ro...@google.com>.
The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
results for LATEST and EARLIEST should be 9 and 0 respectively.

On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
>
> Hi, Folks,
>
>     I am trying to understand the behavior of TimestampCombiner. I have a test like this:
>
> class TimestampCombinerTest(unittest.TestCase):
>
>   def test_combiner_latest(self):
>     """Test TimestampCombiner with LATEST."""
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     p = TestPipeline(options=options)
>
>     main_stream = (p
>                    | 'main TestStream' >> TestStream()
>                    .add_elements([window.TimestampedValue(('k', 100), 0)])
>                    .add_elements([window.TimestampedValue(('k', 400), 9)])
>                    .advance_watermark_to_infinity()
>                    | 'main windowInto' >> beam.WindowInto(
>                       window.FixedWindows(10),
>                       timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>                    | 'Combine' >> beam.CombinePerKey(sum))
>
>     class RecordFn(beam.DoFn):
>       def process(self,
>                   elm=beam.DoFn.ElementParam,
>                   ts=beam.DoFn.TimestampParam):
>         yield (elm, ts)
>
>     records = (main_stream | beam.ParDo(RecordFn()))
>
>     expected_window_to_elements = {
>         window.IntervalWindow(0, 10): [
>             (('k', 500),  Timestamp(9)),
>         ],
>     }
>
>     assert_that(
>         records,
>         equal_to_per_window(expected_window_to_elements),
>         use_global_window=False,
>         label='assert per window')
>
>     p.run()
>
>
> I expect the result to be following (based on various TimestampCombiner strategy):
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
>
> The above outcome is partially confirmed by Java side test : [1]
>
>
> However, from beam python, the outcome is like this:
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>
> What did I miss? what should be the right expected behavior? or this looks like a bug?
>
> [1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>
> Cheers,
>