You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Ahmet Altay (JIRA)" <ji...@apache.org> on 2017/12/22 18:57:00 UTC
[jira] [Assigned] (BEAM-3377) assert_that not working for streaming
[ https://issues.apache.org/jira/browse/BEAM-3377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ahmet Altay reassigned BEAM-3377:
---------------------------------
Assignee: (was: Ahmet Altay)
> assert_that not working for streaming
> -------------------------------------
>
> Key: BEAM-3377
> URL: https://issues.apache.org/jira/browse/BEAM-3377
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.2.0
> Reporter: MarĂa GH
> Labels: starter
>
> assert_that does not work for AfterWatermark timers.
> Easy way to reproduce: modify test_gbk_execution [1] in this form:
>
> {code:java}
> def test_this(self):
> test_stream = (TestStream()
> .add_elements(['a', 'b', 'c'])
> .advance_watermark_to(20))
> def fnc(x):
> print 'fired_elem:', x
> return x
> options = PipelineOptions()
> options.view_as(StandardOptions).streaming = True
> p = TestPipeline(options=options)
> records = (p
> | test_stream
> | beam.WindowInto(
> FixedWindows(15),
> trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
> accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
> | beam.Map(lambda x: ('k', x))
> | beam.GroupByKey())
> assert_that(records, equal_to([
> ('k', ['a', 'b', 'c'])]))
> p.run()
> {code}
> This test will pass, but if the .advance_watermark_to(20) is removed, the test will fail. However, both cases fire the same elements:
> fired_elem: ('k', ['a', 'b', 'c'])
> fired_elem: ('k', ['a', 'b', 'c'])
> In the passing case, they correspond to the sorted_actual inside the assert_that. In the failing case:
> sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
> sorted_actual: []
> [1] https://github.com/mariapython/incubator-beam/blob/direct-timers-show/sdks/python/apache_beam/testing/test_stream_test.py#L120
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)