You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/04/04 16:20:11 UTC
[beam] branch master updated: Fix Python streaming sordcount IT to
unblock PostCommit (#5015)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f82acc4 Fix Python streaming sordcount IT to unblock PostCommit (#5015)
f82acc4 is described below
commit f82acc437cc06a19241a07a5738bec4449ca01ad
Author: Mark Liu <ma...@users.noreply.github.com>
AuthorDate: Wed Apr 4 09:20:00 2018 -0700
Fix Python streaming sordcount IT to unblock PostCommit (#5015)
* Resource cleanup when verification failed.
* Add numbers to wordcount example WordExtractingDoFn
---
.../examples/streaming_wordcount_it_test.py | 15 +--------
sdks/python/apache_beam/examples/wordcount.py | 2 +-
.../apache_beam/io/gcp/tests/pubsub_matcher.py | 2 +-
.../runners/dataflow/test_dataflow_runner.py | 36 ++++++++++++----------
sdks/python/run_postcommit.sh | 2 +-
5 files changed, 23 insertions(+), 34 deletions(-)
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 04e6e4e..d0b53f5 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -15,17 +15,9 @@
# limitations under the License.
#
-"""End-to-end test for the streaming wordcount example.
+"""End-to-end test for the streaming wordcount example."""
-Important: End-to-end test infrastructure for streaming pipeine in Python SDK
-is in development and is not yet available for use.
-
-Currently, this test blocks until the job is manually terminated.
-"""
-
-import datetime
import logging
-import random
import unittest
import uuid
@@ -68,11 +60,6 @@ class StreamingWordCountIT(unittest.TestCase):
self.input_sub.create()
self.output_sub.create()
- def _generate_identifier(self):
- seed = random.randint(0, 999)
- current_time = datetime.datetime.now().strftime('%m%d%H%M%S')
- return '%s%d' % (current_time, seed)
-
def _inject_numbers(self, topic, num_messages):
"""Inject numbers as test data to PubSub."""
logging.debug('Injecting %d numbers to topic %s',
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 7610672..a26ef8a 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -59,7 +59,7 @@ class WordExtractingDoFn(beam.DoFn):
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
- words = re.findall(r'[A-Za-z\']+', text_line)
+ words = re.findall(r'[A-Za-z0-9\']+', text_line)
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 8fb6879..1fb712f 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -72,7 +72,7 @@ class PubSubMessageMatcher(BaseMatcher):
self.messages = None
def _matches(self, _):
- if not self.messages:
+ if self.messages is None:
subscription = (pubsub
.Client(project=self.project)
.subscription(self.sub_name))
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 50390b9..765ed24 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -48,18 +48,20 @@ class TestDataflowRunner(DataflowRunner):
# in some cases.
print('Found: %s.' % self.build_console_url(pipeline.options))
- if not options.view_as(StandardOptions).streaming:
- self.result.wait_until_finish()
- else:
- self.wait_until_in_state(PipelineState.RUNNING)
-
- if on_success_matcher:
- from hamcrest import assert_that as hc_assert_that
- hc_assert_that(self.result, pickler.loads(on_success_matcher))
-
- if options.view_as(StandardOptions).streaming:
- self.result.cancel()
- self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
+ try:
+ if not options.view_as(StandardOptions).streaming:
+ self.result.wait_until_finish()
+ else:
+ self.wait_until_in_state(PipelineState.RUNNING)
+
+ if on_success_matcher:
+ from hamcrest import assert_that as hc_assert_that
+ hc_assert_that(self.result, pickler.loads(on_success_matcher))
+ finally:
+ if not self.result.is_in_terminal_state():
+ self.result.cancel()
+ if options.view_as(StandardOptions).streaming:
+ self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
return self.result
@@ -72,7 +74,7 @@ class TestDataflowRunner(DataflowRunner):
'https://console.cloud.google.com/dataflow/jobsDetail/locations'
'/%s/jobs/%s?project=%s' % (region_id, job_id, project))
- def wait_until_in_state(self, state, timeout=WAIT_TIMEOUT):
+ def wait_until_in_state(self, expected_state, timeout=WAIT_TIMEOUT):
"""Wait until Dataflow pipeline terminate or enter RUNNING state."""
if not self.result.has_job:
raise IOError('Failed to get the Dataflow job id.')
@@ -80,11 +82,11 @@ class TestDataflowRunner(DataflowRunner):
start_time = time.time()
while time.time() - start_time <= timeout:
job_state = self.result.state
- if (self.result.is_in_terminal_state() or
- job_state == PipelineState.RUNNING):
+ if self.result.is_in_terminal_state() or job_state == expected_state:
return job_state
time.sleep(5)
raise RuntimeError('Timeout after %d seconds while waiting for job %s '
- 'enters RUNNING or terminate state.' %
- (WAIT_TIMEOUT, self.result.job_id))
+ 'enters expected state %s. Current state is %s.' %
+ (WAIT_TIMEOUT, self.result.job_id,
+ expected_state, self.result.state))
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 7dc35c4..041ae03 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -71,7 +71,7 @@ python setup.py nosetests \
--attr IT \
--nocapture \
--processes=4 \
- --process-timeout=900 \
+ --process-timeout=1800 \
--test-pipeline-options=" \
--runner=TestDataflowRunner \
--project=$PROJECT \
--
To stop receiving notification emails like this one, please contact
altay@apache.org.