You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ning Kang (Jira)" <ji...@apache.org> on 2020/04/22 16:55:00 UTC

[jira] [Created] (BEAM-9803) test_streaming_wordcount flaky

Ning Kang created BEAM-9803:
-------------------------------

             Summary: test_streaming_wordcount flaky
                 Key: BEAM-9803
                 URL: https://issues.apache.org/jira/browse/BEAM-9803
             Project: Beam
          Issue Type: Test
          Components: sdk-py-core, test-failures
            Reporter: Ning Kang


{code:java}
Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 sec.Error MessageAssertionError: DataFrame are different  DataFrame shape mismatch [left]:  (10, 4) [right]: (6, 4)Stacktraceself = <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest testMethod=test_streaming_wordcount>

    @unittest.skipIf(
        sys.version_info < (3, 5, 3),
        'The tests require at least Python 3.6 to work.')
    def test_streaming_wordcount(self):
      class WordExtractingDoFn(beam.DoFn):
        def process(self, element):
          text_line = element.strip()
          words = text_line.split()
          return words
    
      # Add the TestStream so that it can be cached.
      ib.options.capturable_sources.add(TestStream)
      ib.options.capture_duration = timedelta(seconds=5)
    
      p = beam.Pipeline(
          runner=interactive_runner.InteractiveRunner(),
          options=StandardOptions(streaming=True))
    
      data = (
          p
          | TestStream()
              .advance_watermark_to(0)
              .advance_processing_time(1)
              .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
              .advance_watermark_to(20)
              .advance_processing_time(1)
              .add_elements(['that', 'is', 'the', 'question'])
          | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
    
      counts = (
          data
          | 'split' >> beam.ParDo(WordExtractingDoFn())
          | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
          | 'group' >> beam.GroupByKey()
          | 'count' >> beam.Map(lambda wordones: (wordones[0], sum(wordones[1]))))
    
      # Watch the local scope for Interactive Beam so that referenced PCollections
      # will be cached.
      ib.watch(locals())
    
      # This is normally done in the interactive_utils when a transform is
      # applied but needs an IPython environment. So we manually run this here.
      ie.current_env().track_user_pipelines()
    
      # Create a fake limiter that cancels the BCJ once the main job receives the
      # expected amount of results.
      class FakeLimiter:
        def __init__(self, p, pcoll):
          self.p = p
          self.pcoll = pcoll
    
        def is_triggered(self):
          result = ie.current_env().pipeline_result(self.p)
          if result:
            try:
              results = result.get(self.pcoll)
            except ValueError:
              return False
            return len(results) >= 10
          return False
    
      # This sets the limiters to stop reading when the test receives 10 elements
      # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
      ie.current_env().options.capture_control.set_limiters_for_test(
          [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
    
      # This tests that the data was correctly cached.
      pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
      expected_data_df = pd.DataFrame([
          ('to', 0, [IntervalWindow(0, 10)], pane_info),
          ('be', 0, [IntervalWindow(0, 10)], pane_info),
          ('or', 0, [IntervalWindow(0, 10)], pane_info),
          ('not', 0, [IntervalWindow(0, 10)], pane_info),
          ('to', 0, [IntervalWindow(0, 10)], pane_info),
          ('be', 0, [IntervalWindow(0, 10)], pane_info),
          ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
          ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
          ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
          ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
      ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
    
      data_df = ib.collect(data, include_window_info=True)
>     pd.testing.assert_frame_equal(expected_data_df, data_df)
E     AssertionError: DataFrame are different
E     
E     DataFrame shape mismatch
E     [left]:  (10, 4)
E     [right]: (6, 4)

apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)