You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/05/07 18:57:14 UTC
[beam] branch master updated: Merge pull request #11624 from
[BEAM-9767] Make streaming_wordcount use a test timeout and increase from
5s to 30s
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fb70c2f Merge pull request #11624 from [BEAM-9767] Make streaming_wordcount use a test timeout and increase from 5s to 30s
fb70c2f is described below
commit fb70c2f1cfa297211182762fa644f04973a27124
Author: Sam sam <ro...@gmail.com>
AuthorDate: Thu May 7 11:56:56 2020 -0700
Merge pull request #11624 from [BEAM-9767] Make streaming_wordcount use a test timeout and increase from 5s to 30s
* make streaming_wordcount an integration test
Change-Id: I083dccc63d8c44274ec175e2bd1520c540adf9b3
* change to timeout
Change-Id: I7c7903e0f63dd992f52723c308efef5402d8fb11
---
.../apache_beam/runners/interactive/interactive_runner_test.py | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 5ed4f83..0d92aa5 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -28,7 +28,6 @@ from __future__ import print_function
import sys
import unittest
-from datetime import timedelta
import pandas as pd
@@ -38,9 +37,9 @@ from apache_beam.runners.direct import direct_runner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import interactive_runner
-from apache_beam.runners.interactive.options.capture_limiters import DurationLimiter
from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
from apache_beam.testing.test_stream import TestStream
+from apache_beam.testing.util import timeout
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import IntervalWindow
from apache_beam.utils.timestamp import Timestamp
@@ -156,6 +155,7 @@ class InteractiveRunnerTest(unittest.TestCase):
@unittest.skipIf(
sys.version_info < (3, 5, 3),
'The tests require at least Python 3.6 to work.')
+ @timeout(30)
def test_streaming_wordcount(self):
class WordExtractingDoFn(beam.DoFn):
def process(self, element):
@@ -165,7 +165,6 @@ class InteractiveRunnerTest(unittest.TestCase):
# Add the TestStream so that it can be cached.
ib.options.capturable_sources.add(TestStream)
- ib.options.capture_duration = timedelta(seconds=5)
p = beam.Pipeline(
runner=interactive_runner.InteractiveRunner(),
@@ -214,10 +213,9 @@ class InteractiveRunnerTest(unittest.TestCase):
return len(results) >= 10
return False
- # This sets the limiters to stop reading when the test receives 10 elements
- # or after 5 seconds have elapsed (to eliminate the possibility of hanging).
+ # This sets the limiters to stop reading when the test receives 10 elements.
ie.current_env().options.capture_control.set_limiters_for_test(
- [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
+ [FakeLimiter(p, data)])
# This tests that the data was correctly cached.
pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)