You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ruoyun Huang <ru...@google.com> on 2019/11/12 01:34:14 UTC
Behavior of TimestampCombiner?
Hi, Folks,
I am trying to understand the behavior of TimestampCombiner. I have a
test like this:
1. class TimestampCombinerTest(unittest.TestCase):
2.
3. def test_combiner_latest(self):
4. """Test TimestampCombiner with LATEST."""
5. options = PipelineOptions()
6. options.view_as(StandardOptions).streaming = True
7. p = TestPipeline(options=options)
8.
9. main_stream = (p
10. | 'main TestStream' >> TestStream()
11. .add_elements([window.TimestampedValue(('k',
100), 0)])
12. .add_elements([window.TimestampedValue(('k',
400), 9)])
13. .advance_watermark_to_infinity()
14. | 'main windowInto' >> beam.WindowInto(
15. window.FixedWindows(10),
16.
timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
17. | 'Combine' >> beam.CombinePerKey(sum))
18.
19. class RecordFn(beam.DoFn):
20. def process(self,
21. elm=beam.DoFn.ElementParam,
22. ts=beam.DoFn.TimestampParam):
23. yield (elm, ts)
24.
25. records = (main_stream | beam.ParDo(RecordFn()))
26.
27. expected_window_to_elements = {
28. window.IntervalWindow(0, 10): [
29. (('k', 500), Timestamp(9)),
30. ],
31. }
32.
33. assert_that(
34. records,
35. equal_to_per_window(expected_window_to_elements),
36. use_global_window=False,
37. label='assert per window')
38.
39. p.run()
I expect the result to be following (based on various TimestampCombiner
strategy):
LATEST: (('k', 500), Timestamp(9)),
EARLIEST: (('k', 500), Timestamp(0)),
END_OF_WINDOW: (('k', 500), Timestamp(10)),
The above outcome is partially confirmed by Java side test : [1]
However, from beam python, the outcome is like this:
LATEST: (('k', 500), Timestamp(10)),
EARLIEST: (('k', 500), Timestamp(10)),
END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
What did I miss? what should be the right expected behavior? or this looks
like a bug?
[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
Cheers,
Re: Behavior of TimestampCombiner?
Posted by Robert Bradshaw <ro...@google.com>.
I bet, as with the previous one, this is due to over-eager combiner lifting.
On Tue, Nov 12, 2019 at 4:17 PM Ruoyun Huang <ru...@google.com> wrote:
>
> Reported a tracking JIRA: https://issues.apache.org/jira/browse/BEAM-8645
>
> On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <ru...@google.com> wrote:
>>
>> Thanks for confirming.
>>
>> Since it is unexpected behavior, I shall look into jira if it is already on radar, if not, will create one.
>>
>> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
>>> results for LATEST and EARLIEST should be 9 and 0 respectively.
>>>
>>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
>>> >
>>> > Hi, Folks,
>>> >
>>> > I am trying to understand the behavior of TimestampCombiner. I have a test like this:
>>> >
>>> > class TimestampCombinerTest(unittest.TestCase):
>>> >
>>> > def test_combiner_latest(self):
>>> > """Test TimestampCombiner with LATEST."""
>>> > options = PipelineOptions()
>>> > options.view_as(StandardOptions).streaming = True
>>> > p = TestPipeline(options=options)
>>> >
>>> > main_stream = (p
>>> > | 'main TestStream' >> TestStream()
>>> > .add_elements([window.TimestampedValue(('k', 100), 0)])
>>> > .add_elements([window.TimestampedValue(('k', 400), 9)])
>>> > .advance_watermark_to_infinity()
>>> > | 'main windowInto' >> beam.WindowInto(
>>> > window.FixedWindows(10),
>>> > timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>>> > | 'Combine' >> beam.CombinePerKey(sum))
>>> >
>>> > class RecordFn(beam.DoFn):
>>> > def process(self,
>>> > elm=beam.DoFn.ElementParam,
>>> > ts=beam.DoFn.TimestampParam):
>>> > yield (elm, ts)
>>> >
>>> > records = (main_stream | beam.ParDo(RecordFn()))
>>> >
>>> > expected_window_to_elements = {
>>> > window.IntervalWindow(0, 10): [
>>> > (('k', 500), Timestamp(9)),
>>> > ],
>>> > }
>>> >
>>> > assert_that(
>>> > records,
>>> > equal_to_per_window(expected_window_to_elements),
>>> > use_global_window=False,
>>> > label='assert per window')
>>> >
>>> > p.run()
>>> >
>>> >
>>> > I expect the result to be following (based on various TimestampCombiner strategy):
>>> > LATEST: (('k', 500), Timestamp(9)),
>>> > EARLIEST: (('k', 500), Timestamp(0)),
>>> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
>>> >
>>> > The above outcome is partially confirmed by Java side test : [1]
>>> >
>>> >
>>> > However, from beam python, the outcome is like this:
>>> > LATEST: (('k', 500), Timestamp(10)),
>>> > EARLIEST: (('k', 500), Timestamp(10)),
>>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>>> >
>>> > What did I miss? what should be the right expected behavior? or this looks like a bug?
>>> >
>>> > [1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>>> >
>>> > Cheers,
>>> >
>>
>>
>>
>> --
>> ================
>> Ruoyun Huang
>>
>
>
> --
> ================
> Ruoyun Huang
>
Re: Behavior of TimestampCombiner?
Posted by Ruoyun Huang <ru...@google.com>.
Reported a tracking JIRA: https://issues.apache.org/jira/browse/BEAM-8645
On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <ru...@google.com> wrote:
> Thanks for confirming.
>
> Since it is unexpected behavior, I shall look into jira if it is already
> on radar, if not, will create one.
>
> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
>> results for LATEST and EARLIEST should be 9 and 0 respectively.
>>
>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
>> >
>> > Hi, Folks,
>> >
>> > I am trying to understand the behavior of TimestampCombiner. I have
>> a test like this:
>> >
>> > class TimestampCombinerTest(unittest.TestCase):
>> >
>> > def test_combiner_latest(self):
>> > """Test TimestampCombiner with LATEST."""
>> > options = PipelineOptions()
>> > options.view_as(StandardOptions).streaming = True
>> > p = TestPipeline(options=options)
>> >
>> > main_stream = (p
>> > | 'main TestStream' >> TestStream()
>> > .add_elements([window.TimestampedValue(('k', 100),
>> 0)])
>> > .add_elements([window.TimestampedValue(('k', 400),
>> 9)])
>> > .advance_watermark_to_infinity()
>> > | 'main windowInto' >> beam.WindowInto(
>> > window.FixedWindows(10),
>> >
>> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>> > | 'Combine' >> beam.CombinePerKey(sum))
>> >
>> > class RecordFn(beam.DoFn):
>> > def process(self,
>> > elm=beam.DoFn.ElementParam,
>> > ts=beam.DoFn.TimestampParam):
>> > yield (elm, ts)
>> >
>> > records = (main_stream | beam.ParDo(RecordFn()))
>> >
>> > expected_window_to_elements = {
>> > window.IntervalWindow(0, 10): [
>> > (('k', 500), Timestamp(9)),
>> > ],
>> > }
>> >
>> > assert_that(
>> > records,
>> > equal_to_per_window(expected_window_to_elements),
>> > use_global_window=False,
>> > label='assert per window')
>> >
>> > p.run()
>> >
>> >
>> > I expect the result to be following (based on various TimestampCombiner
>> strategy):
>> > LATEST: (('k', 500), Timestamp(9)),
>> > EARLIEST: (('k', 500), Timestamp(0)),
>> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
>> >
>> > The above outcome is partially confirmed by Java side test : [1]
>> >
>> >
>> > However, from beam python, the outcome is like this:
>> > LATEST: (('k', 500), Timestamp(10)),
>> > EARLIEST: (('k', 500), Timestamp(10)),
>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>> >
>> > What did I miss? what should be the right expected behavior? or this
>> looks like a bug?
>> >
>> > [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>> >
>> > Cheers,
>> >
>>
>
>
> --
> ================
> Ruoyun Huang
>
>
--
================
Ruoyun Huang
Re: Behavior of TimestampCombiner?
Posted by Ruoyun Huang <ru...@google.com>.
Thanks for confirming.
Since it is unexpected behavior, I shall look into jira if it is already on
radar, if not, will create one.
On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <ro...@google.com> wrote:
> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
> results for LATEST and EARLIEST should be 9 and 0 respectively.
>
> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
> >
> > Hi, Folks,
> >
> > I am trying to understand the behavior of TimestampCombiner. I have
> a test like this:
> >
> > class TimestampCombinerTest(unittest.TestCase):
> >
> > def test_combiner_latest(self):
> > """Test TimestampCombiner with LATEST."""
> > options = PipelineOptions()
> > options.view_as(StandardOptions).streaming = True
> > p = TestPipeline(options=options)
> >
> > main_stream = (p
> > | 'main TestStream' >> TestStream()
> > .add_elements([window.TimestampedValue(('k', 100),
> 0)])
> > .add_elements([window.TimestampedValue(('k', 400),
> 9)])
> > .advance_watermark_to_infinity()
> > | 'main windowInto' >> beam.WindowInto(
> > window.FixedWindows(10),
> >
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
> > | 'Combine' >> beam.CombinePerKey(sum))
> >
> > class RecordFn(beam.DoFn):
> > def process(self,
> > elm=beam.DoFn.ElementParam,
> > ts=beam.DoFn.TimestampParam):
> > yield (elm, ts)
> >
> > records = (main_stream | beam.ParDo(RecordFn()))
> >
> > expected_window_to_elements = {
> > window.IntervalWindow(0, 10): [
> > (('k', 500), Timestamp(9)),
> > ],
> > }
> >
> > assert_that(
> > records,
> > equal_to_per_window(expected_window_to_elements),
> > use_global_window=False,
> > label='assert per window')
> >
> > p.run()
> >
> >
> > I expect the result to be following (based on various TimestampCombiner
> strategy):
> > LATEST: (('k', 500), Timestamp(9)),
> > EARLIEST: (('k', 500), Timestamp(0)),
> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
> >
> > The above outcome is partially confirmed by Java side test : [1]
> >
> >
> > However, from beam python, the outcome is like this:
> > LATEST: (('k', 500), Timestamp(10)),
> > EARLIEST: (('k', 500), Timestamp(10)),
> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
> >
> > What did I miss? what should be the right expected behavior? or this
> looks like a bug?
> >
> > [1]:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
> >
> > Cheers,
> >
>
--
================
Ruoyun Huang
Re: Behavior of TimestampCombiner?
Posted by Robert Bradshaw <ro...@google.com>.
The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
results for LATEST and EARLIEST should be 9 and 0 respectively.
On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ru...@google.com> wrote:
>
> Hi, Folks,
>
> I am trying to understand the behavior of TimestampCombiner. I have a test like this:
>
> class TimestampCombinerTest(unittest.TestCase):
>
> def test_combiner_latest(self):
> """Test TimestampCombiner with LATEST."""
> options = PipelineOptions()
> options.view_as(StandardOptions).streaming = True
> p = TestPipeline(options=options)
>
> main_stream = (p
> | 'main TestStream' >> TestStream()
> .add_elements([window.TimestampedValue(('k', 100), 0)])
> .add_elements([window.TimestampedValue(('k', 400), 9)])
> .advance_watermark_to_infinity()
> | 'main windowInto' >> beam.WindowInto(
> window.FixedWindows(10),
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
> | 'Combine' >> beam.CombinePerKey(sum))
>
> class RecordFn(beam.DoFn):
> def process(self,
> elm=beam.DoFn.ElementParam,
> ts=beam.DoFn.TimestampParam):
> yield (elm, ts)
>
> records = (main_stream | beam.ParDo(RecordFn()))
>
> expected_window_to_elements = {
> window.IntervalWindow(0, 10): [
> (('k', 500), Timestamp(9)),
> ],
> }
>
> assert_that(
> records,
> equal_to_per_window(expected_window_to_elements),
> use_global_window=False,
> label='assert per window')
>
> p.run()
>
>
> I expect the result to be following (based on various TimestampCombiner strategy):
> LATEST: (('k', 500), Timestamp(9)),
> EARLIEST: (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
>
> The above outcome is partially confirmed by Java side test : [1]
>
>
> However, from beam python, the outcome is like this:
> LATEST: (('k', 500), Timestamp(10)),
> EARLIEST: (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>
> What did I miss? what should be the right expected behavior? or this looks like a bug?
>
> [1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>
> Cheers,
>