You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Udi Meiri (Jira)" <ji...@apache.org> on 2020/04/15 18:49:00 UTC

[jira] [Created] (BEAM-9767) test_streaming_wordcount flaky timeouts

Udi Meiri created BEAM-9767:
-------------------------------

             Summary: test_streaming_wordcount flaky timeouts
                 Key: BEAM-9767
                 URL: https://issues.apache.org/jira/browse/BEAM-9767
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core, test-failures
            Reporter: Udi Meiri
            Assignee: Sam Rohde


Timed out after 600s, typically completes in 2.8s on my workstation.

https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/
{code}
self = <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest testMethod=test_streaming_wordcount>

    @unittest.skipIf(
        sys.version_info < (3, 5, 3),
        'The tests require at least Python 3.6 to work.')
    def test_streaming_wordcount(self):
      class WordExtractingDoFn(beam.DoFn):
        def process(self, element):
          text_line = element.strip()
          words = text_line.split()
          return words
    
      # Add the TestStream so that it can be cached.
      ib.options.capturable_sources.add(TestStream)
      ib.options.capture_duration = timedelta(seconds=5)
    
      p = beam.Pipeline(
          runner=interactive_runner.InteractiveRunner(),
          options=StandardOptions(streaming=True))
    
      data = (
          p
          | TestStream()
              .advance_watermark_to(0)
              .advance_processing_time(1)
              .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
              .advance_watermark_to(20)
              .advance_processing_time(1)
              .add_elements(['that', 'is', 'the', 'question'])
          | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
    
      counts = (
          data
          | 'split' >> beam.ParDo(WordExtractingDoFn())
          | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
          | 'group' >> beam.GroupByKey()
          | 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1]))))
    
      # Watch the local scope for Interactive Beam so that referenced PCollections
      # will be cached.
      ib.watch(locals())
    
      # This is normally done in the interactive_utils when a transform is
      # applied but needs an IPython environment. So we manually run this here.
      ie.current_env().track_user_pipelines()
    
      # Create a fake limiter that cancels the BCJ once the main job receives the
      # expected amount of results.
      class FakeLimiter:
        def __init__(self, p, pcoll):
          self.p = p
          self.pcoll = pcoll
    
        def is_triggered(self):
          result = ie.current_env().pipeline_result(self.p)
          if result:
            try:
              results = result.get(self.pcoll)
            except ValueError:
              return False
            return len(results) >= 10
          return False
    
      # This sets the limiters to stop reading when the test receives 10 elements
      # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
      ie.current_env().options.capture_control.set_limiters_for_test(
          [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
    
      # This tests that the data was correctly cached.
      pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
      expected_data_df = pd.DataFrame([
          ('to', 0, [IntervalWindow(0, 10)], pane_info),
          ('be', 0, [IntervalWindow(0, 10)], pane_info),
          ('or', 0, [IntervalWindow(0, 10)], pane_info),
          ('not', 0, [IntervalWindow(0, 10)], pane_info),
          ('to', 0, [IntervalWindow(0, 10)], pane_info),
          ('be', 0, [IntervalWindow(0, 10)], pane_info),
          ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
          ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
          ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
          ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
      ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
    
>     data_df = ib.collect(data, include_window_info=True)

apache_beam/runners/interactive/interactive_runner_test.py:237: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/runners/interactive/interactive_beam.py:451: in collect
    return head(pcoll, n=-1, include_window_info=include_window_info)
apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator
    return func(*args, **kwargs)
apache_beam/runners/interactive/interactive_beam.py:515: in head
    result.wait_until_finish()
apache_beam/runners/interactive/interactive_runner.py:250: in wait_until_finish
    self._underlying_result.wait_until_finish()
apache_beam/runners/direct/direct_runner.py:455: in wait_until_finish
    self._executor.await_completion()
apache_beam/runners/direct/executor.py:439: in await_completion
    self._executor.await_completion()
apache_beam/runners/direct/executor.py:484: in await_completion
    update = self.visible_updates.take()
apache_beam/runners/direct/executor.py:557: in take
    item = self._queue.get(timeout=1)
/usr/lib/python3.6/queue.py:173: in get
    self.not_empty.wait(remaining)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Condition(<unlocked _thread.lock object at 0x7f2f85244198>, 0)>
timeout = 0.9999979329295456

    def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
    
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
    
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notify_all() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
    
        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).
    
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
    
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
>                   gotit = waiter.acquire(True, timeout)
E                   Failed: Timeout >600.0s

/usr/lib/python3.6/threading.py:299: Failed
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)