You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/25 22:19:33 UTC

[beam] branch master updated: [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 883271c  [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature
     new 53800e8  Merge pull request #11227 from [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature
883271c is described below

commit 883271c636f5a9381371c429ed32a50446e7528b
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Wed Mar 25 10:44:18 2020 -0700

    [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature
    
    Change-Id: I9caaf395fd0fc58565e54a8458e8289af761815f
---
 .../runners/interactive/interactive_runner_test.py | 64 +++++++++++-----------
 1 file changed, 33 insertions(+), 31 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 329e0f0..45ef110 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -26,6 +26,7 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
+import sys
 import unittest
 from datetime import timedelta
 
@@ -40,6 +41,7 @@ from apache_beam.runners.interactive import interactive_runner
 from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import IntervalWindow
 from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import PaneInfo
 from apache_beam.utils.windowed_value import PaneInfoTiming
@@ -150,9 +152,10 @@ class InteractiveRunnerTest(unittest.TestCase):
     ]
     self.assertEqual(actual_reified, expected_reified)
 
+  @unittest.skipIf(
+      sys.version_info < (3, 5, 3),
+      'The tests require at least Python 3.6 to work.')
   def test_streaming_wordcount(self):
-    self.skipTest('[BEAM-9601] Test is breaking PreCommits')
-
     class WordExtractingDoFn(beam.DoFn):
       def process(self, element):
         text_line = element.strip()
@@ -196,20 +199,17 @@ class InteractiveRunnerTest(unittest.TestCase):
     # This tests that the data was correctly cached.
     pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
     expected_data_df = pd.DataFrame([
-        ('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
-        ('be', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
-        ('or', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
-        ('not', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
-        ('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
-        ('be', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
-        ('that', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info),
-        ('is', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info),
-        ('the', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info),
-        ('question', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info)
-    ],
-                                    columns=[
-                                        0, 'event_time', 'windows', 'pane_info'
-                                    ])
+        ('to', 0, [IntervalWindow(0, 10)], pane_info),
+        ('be', 0, [IntervalWindow(0, 10)], pane_info),
+        ('or', 0, [IntervalWindow(0, 10)], pane_info),
+        ('not', 0, [IntervalWindow(0, 10)], pane_info),
+        ('to', 0, [IntervalWindow(0, 10)], pane_info),
+        ('be', 0, [IntervalWindow(0, 10)], pane_info),
+        ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
+        ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
+        ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
+        ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
+    ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
 
     data_df = ib.collect(data, include_window_info=True)
     pd.testing.assert_frame_equal(expected_data_df, data_df)
@@ -217,23 +217,25 @@ class InteractiveRunnerTest(unittest.TestCase):
     # This tests that the windowing was passed correctly so that all the data
     # is aggregated also correctly.
     pane_info = PaneInfo(True, False, PaneInfoTiming.ON_TIME, 0, 0)
-    expected_counts_df = pd.DataFrame(
-        [('to', 2, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
-         ('be', 2, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
-         ('or', 1, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
-         ('not', 1, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
-         ('that', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info),
-         ('is', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info),
-         ('the', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info),
-         (
-             'question',
-             1,
-             29999999, [beam.window.IntervalWindow(20, 30)],
-             pane_info)],
-        columns=[0, 1, 'event_time', 'windows', 'pane_info'])
+    expected_counts_df = pd.DataFrame([
+        ('be', 2, 9999999, [IntervalWindow(0, 10)], pane_info),
+        ('not', 1, 9999999, [IntervalWindow(0, 10)], pane_info),
+        ('or', 1, 9999999, [IntervalWindow(0, 10)], pane_info),
+        ('to', 2, 9999999, [IntervalWindow(0, 10)], pane_info),
+        ('is', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+        ('question', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+        ('that', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+        ('the', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+    ], columns=[0, 1, 'event_time', 'windows', 'pane_info']) # yapf: disable
 
     counts_df = ib.collect(counts, include_window_info=True)
-    pd.testing.assert_frame_equal(expected_counts_df, counts_df)
+
+    # The group by key has no guarantee of order. So we post-process the DF by
+    # sorting so we can test equality.
+    sorted_counts_df = (counts_df
+                        .sort_values(['event_time', 0], ascending=True)
+                        .reset_index(drop=True)) # yapf: disable
+    pd.testing.assert_frame_equal(expected_counts_df, sorted_counts_df)
 
   def test_session(self):
     class MockPipelineRunner(object):