You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/03/25 22:19:33 UTC
[beam] branch master updated: [BEAM-9601] Skip the streaming
wordcount test because it uses a Python3.5.3+ feature
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 883271c [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature
new 53800e8 Merge pull request #11227 from [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature
883271c is described below
commit 883271c636f5a9381371c429ed32a50446e7528b
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Wed Mar 25 10:44:18 2020 -0700
[BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature
Change-Id: I9caaf395fd0fc58565e54a8458e8289af761815f
---
.../runners/interactive/interactive_runner_test.py | 64 +++++++++++-----------
1 file changed, 33 insertions(+), 31 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 329e0f0..45ef110 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -26,6 +26,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
+import sys
import unittest
from datetime import timedelta
@@ -40,6 +41,7 @@ from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import IntervalWindow
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import PaneInfo
from apache_beam.utils.windowed_value import PaneInfoTiming
@@ -150,9 +152,10 @@ class InteractiveRunnerTest(unittest.TestCase):
]
self.assertEqual(actual_reified, expected_reified)
+ @unittest.skipIf(
+ sys.version_info < (3, 5, 3),
+ 'The tests require at least Python 3.6 to work.')
def test_streaming_wordcount(self):
- self.skipTest('[BEAM-9601] Test is breaking PreCommits')
-
class WordExtractingDoFn(beam.DoFn):
def process(self, element):
text_line = element.strip()
@@ -196,20 +199,17 @@ class InteractiveRunnerTest(unittest.TestCase):
# This tests that the data was correctly cached.
pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
expected_data_df = pd.DataFrame([
- ('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('be', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('or', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('not', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('be', 0, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('that', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info),
- ('is', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info),
- ('the', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info),
- ('question', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info)
- ],
- columns=[
- 0, 'event_time', 'windows', 'pane_info'
- ])
+ ('to', 0, [IntervalWindow(0, 10)], pane_info),
+ ('be', 0, [IntervalWindow(0, 10)], pane_info),
+ ('or', 0, [IntervalWindow(0, 10)], pane_info),
+ ('not', 0, [IntervalWindow(0, 10)], pane_info),
+ ('to', 0, [IntervalWindow(0, 10)], pane_info),
+ ('be', 0, [IntervalWindow(0, 10)], pane_info),
+ ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
+ ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
+ ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
+ ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
+ ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
data_df = ib.collect(data, include_window_info=True)
pd.testing.assert_frame_equal(expected_data_df, data_df)
@@ -217,23 +217,25 @@ class InteractiveRunnerTest(unittest.TestCase):
# This tests that the windowing was passed correctly so that all the data
# is aggregated also correctly.
pane_info = PaneInfo(True, False, PaneInfoTiming.ON_TIME, 0, 0)
- expected_counts_df = pd.DataFrame(
- [('to', 2, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('be', 2, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('or', 1, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('not', 1, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info),
- ('that', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info),
- ('is', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info),
- ('the', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info),
- (
- 'question',
- 1,
- 29999999, [beam.window.IntervalWindow(20, 30)],
- pane_info)],
- columns=[0, 1, 'event_time', 'windows', 'pane_info'])
+ expected_counts_df = pd.DataFrame([
+ ('be', 2, 9999999, [IntervalWindow(0, 10)], pane_info),
+ ('not', 1, 9999999, [IntervalWindow(0, 10)], pane_info),
+ ('or', 1, 9999999, [IntervalWindow(0, 10)], pane_info),
+ ('to', 2, 9999999, [IntervalWindow(0, 10)], pane_info),
+ ('is', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+ ('question', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+ ('that', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+ ('the', 1, 29999999, [IntervalWindow(20, 30)], pane_info),
+ ], columns=[0, 1, 'event_time', 'windows', 'pane_info']) # yapf: disable
counts_df = ib.collect(counts, include_window_info=True)
- pd.testing.assert_frame_equal(expected_counts_df, counts_df)
+
+ # The group by key has no guarantee of order. So we post-process the DF by
+ # sorting so we can test equality.
+ sorted_counts_df = (counts_df
+ .sort_values(['event_time', 0], ascending=True)
+ .reset_index(drop=True)) # yapf: disable
+ pd.testing.assert_frame_equal(expected_counts_df, sorted_counts_df)
def test_session(self):
class MockPipelineRunner(object):