You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Pablo Estrada (Jira)" <ji...@apache.org> on 2019/09/04 20:21:00 UTC

[jira] [Updated] (BEAM-3377) assert_that not working for streaming

     [ https://issues.apache.org/jira/browse/BEAM-3377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Pablo Estrada updated BEAM-3377:
--------------------------------
    Labels:   (was: starter)

> assert_that not working for streaming
> -------------------------------------
>
>                 Key: BEAM-3377
>                 URL: https://issues.apache.org/jira/browse/BEAM-3377
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.2.0
>            Reporter: MarĂ­a GH
>            Priority: Major
>          Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> assert_that does not work for AfterWatermark timers.
> Easy way to reproduce: modify test_gbk_execution [1] in this form:
>  
> {code:java}
>  def test_this(self):
>     test_stream = (TestStream()
>                    .add_elements(['a', 'b', 'c'])
>                    .advance_watermark_to(20))
>     def fnc(x):
>       print 'fired_elem:', x
>       return x
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     p = TestPipeline(options=options)
>     records = (p
>                | test_stream
>                | beam.WindowInto(
>                    FixedWindows(15),
>                    trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
>                    accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
>                | beam.Map(lambda x: ('k', x))
>                | beam.GroupByKey())
>     assert_that(records, equal_to([
>         ('k', ['a', 'b', 'c'])]))
>     p.run()
> {code}
> This test will pass, but if the .advance_watermark_to(20) is removed, the test will fail. However, both cases fire the same elements:
> 	fired_elem: ('k', ['a', 'b', 'c'])
> 	fired_elem: ('k', ['a', 'b', 'c'])
> In the passing case, they correspond to the sorted_actual inside the assert_that. In the failing case:
> 	sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
> 	sorted_actual: []
> [1] https://github.com/mariapython/incubator-beam/blob/direct-timers-show/sdks/python/apache_beam/testing/test_stream_test.py#L120



--
This message was sent by Atlassian Jira
(v8.3.2#803003)