You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Sam Rohde (Jira)" <ji...@apache.org> on 2020/05/11 18:20:00 UTC

[jira] [Comment Edited] (BEAM-9767) test_streaming_wordcount flaky timeouts

    [ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104733#comment-17104733 ] 

Sam Rohde edited comment on BEAM-9767 at 5/11/20, 6:19 PM:
-----------------------------------------------------------

Thanks for the update Brian, I have root caused it and have a PR out now. Seems to be an edge case around the StreamingCache.

 

EDIT: PR is [https://github.com/apache/beam/pull/11663]


was (Author: rohdesam):
Thanks for the update Brian, I have root caused it and have a PR out now. Seems to be an edge case around the StreamingCache.

> test_streaming_wordcount flaky timeouts
> ---------------------------------------
>
>                 Key: BEAM-9767
>                 URL: https://issues.apache.org/jira/browse/BEAM-9767
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core, test-failures
>            Reporter: Udi Meiri
>            Assignee: Sam Rohde
>            Priority: Critical
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Timed out after 600s, typically completes in 2.8s on my workstation.
> https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/
> {code}
> self = <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest testMethod=test_streaming_wordcount>
>     @unittest.skipIf(
>         sys.version_info < (3, 5, 3),
>         'The tests require at least Python 3.6 to work.')
>     def test_streaming_wordcount(self):
>       class WordExtractingDoFn(beam.DoFn):
>         def process(self, element):
>           text_line = element.strip()
>           words = text_line.split()
>           return words
>     
>       # Add the TestStream so that it can be cached.
>       ib.options.capturable_sources.add(TestStream)
>       ib.options.capture_duration = timedelta(seconds=5)
>     
>       p = beam.Pipeline(
>           runner=interactive_runner.InteractiveRunner(),
>           options=StandardOptions(streaming=True))
>     
>       data = (
>           p
>           | TestStream()
>               .advance_watermark_to(0)
>               .advance_processing_time(1)
>               .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
>               .advance_watermark_to(20)
>               .advance_processing_time(1)
>               .add_elements(['that', 'is', 'the', 'question'])
>           | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
>     
>       counts = (
>           data
>           | 'split' >> beam.ParDo(WordExtractingDoFn())
>           | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
>           | 'group' >> beam.GroupByKey()
>           | 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1]))))
>     
>       # Watch the local scope for Interactive Beam so that referenced PCollections
>       # will be cached.
>       ib.watch(locals())
>     
>       # This is normally done in the interactive_utils when a transform is
>       # applied but needs an IPython environment. So we manually run this here.
>       ie.current_env().track_user_pipelines()
>     
>       # Create a fake limiter that cancels the BCJ once the main job receives the
>       # expected amount of results.
>       class FakeLimiter:
>         def __init__(self, p, pcoll):
>           self.p = p
>           self.pcoll = pcoll
>     
>         def is_triggered(self):
>           result = ie.current_env().pipeline_result(self.p)
>           if result:
>             try:
>               results = result.get(self.pcoll)
>             except ValueError:
>               return False
>             return len(results) >= 10
>           return False
>     
>       # This sets the limiters to stop reading when the test receives 10 elements
>       # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
>       ie.current_env().options.capture_control.set_limiters_for_test(
>           [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
>     
>       # This tests that the data was correctly cached.
>       pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
>       expected_data_df = pd.DataFrame([
>           ('to', 0, [IntervalWindow(0, 10)], pane_info),
>           ('be', 0, [IntervalWindow(0, 10)], pane_info),
>           ('or', 0, [IntervalWindow(0, 10)], pane_info),
>           ('not', 0, [IntervalWindow(0, 10)], pane_info),
>           ('to', 0, [IntervalWindow(0, 10)], pane_info),
>           ('be', 0, [IntervalWindow(0, 10)], pane_info),
>           ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
>       ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
>     
> >     data_df = ib.collect(data, include_window_info=True)
> apache_beam/runners/interactive/interactive_runner_test.py:237: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> apache_beam/runners/interactive/interactive_beam.py:451: in collect
>     return head(pcoll, n=-1, include_window_info=include_window_info)
> apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator
>     return func(*args, **kwargs)
> apache_beam/runners/interactive/interactive_beam.py:515: in head
>     result.wait_until_finish()
> apache_beam/runners/interactive/interactive_runner.py:250: in wait_until_finish
>     self._underlying_result.wait_until_finish()
> apache_beam/runners/direct/direct_runner.py:455: in wait_until_finish
>     self._executor.await_completion()
> apache_beam/runners/direct/executor.py:439: in await_completion
>     self._executor.await_completion()
> apache_beam/runners/direct/executor.py:484: in await_completion
>     update = self.visible_updates.take()
> apache_beam/runners/direct/executor.py:557: in take
>     item = self._queue.get(timeout=1)
> /usr/lib/python3.6/queue.py:173: in get
>     self.not_empty.wait(remaining)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> self = <Condition(<unlocked _thread.lock object at 0x7f2f85244198>, 0)>
> timeout = 0.9999979329295456
>     def wait(self, timeout=None):
>         """Wait until notified or until a timeout occurs.
>     
>         If the calling thread has not acquired the lock when this method is
>         called, a RuntimeError is raised.
>     
>         This method releases the underlying lock, and then blocks until it is
>         awakened by a notify() or notify_all() call for the same condition
>         variable in another thread, or until the optional timeout occurs. Once
>         awakened or timed out, it re-acquires the lock and returns.
>     
>         When the timeout argument is present and not None, it should be a
>         floating point number specifying a timeout for the operation in seconds
>         (or fractions thereof).
>     
>         When the underlying lock is an RLock, it is not released using its
>         release() method, since this may not actually unlock the lock when it
>         was acquired multiple times recursively. Instead, an internal interface
>         of the RLock class is used, which really unlocks it even when it has
>         been recursively acquired several times. Another internal interface is
>         then used to restore the recursion level when the lock is reacquired.
>     
>         """
>         if not self._is_owned():
>             raise RuntimeError("cannot wait on un-acquired lock")
>         waiter = _allocate_lock()
>         waiter.acquire()
>         self._waiters.append(waiter)
>         saved_state = self._release_save()
>         gotit = False
>         try:    # restore state no matter what (e.g., KeyboardInterrupt)
>             if timeout is None:
>                 waiter.acquire()
>                 gotit = True
>             else:
>                 if timeout > 0:
> >                   gotit = waiter.acquire(True, timeout)
> E                   Failed: Timeout >600.0s
> /usr/lib/python3.6/threading.py:299: Failed
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)