You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/04/04 16:20:11 UTC

[beam] branch master updated: Fix Python streaming sordcount IT to unblock PostCommit (#5015)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f82acc4  Fix Python streaming sordcount IT to unblock PostCommit (#5015)
f82acc4 is described below

commit f82acc437cc06a19241a07a5738bec4449ca01ad
Author: Mark Liu <ma...@users.noreply.github.com>
AuthorDate: Wed Apr 4 09:20:00 2018 -0700

    Fix Python streaming sordcount IT to unblock PostCommit (#5015)
    
    * Resource cleanup when verification failed.
    * Add numbers to wordcount example WordExtractingDoFn
---
 .../examples/streaming_wordcount_it_test.py        | 15 +--------
 sdks/python/apache_beam/examples/wordcount.py      |  2 +-
 .../apache_beam/io/gcp/tests/pubsub_matcher.py     |  2 +-
 .../runners/dataflow/test_dataflow_runner.py       | 36 ++++++++++++----------
 sdks/python/run_postcommit.sh                      |  2 +-
 5 files changed, 23 insertions(+), 34 deletions(-)

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 04e6e4e..d0b53f5 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -15,17 +15,9 @@
 # limitations under the License.
 #
 
-"""End-to-end test for the streaming wordcount example.
+"""End-to-end test for the streaming wordcount example."""
 
-Important: End-to-end test infrastructure for streaming pipeine in Python SDK
-is in development and is not yet available for use.
-
-Currently, this test blocks until the job is manually terminated.
-"""
-
-import datetime
 import logging
-import random
 import unittest
 import uuid
 
@@ -68,11 +60,6 @@ class StreamingWordCountIT(unittest.TestCase):
     self.input_sub.create()
     self.output_sub.create()
 
-  def _generate_identifier(self):
-    seed = random.randint(0, 999)
-    current_time = datetime.datetime.now().strftime('%m%d%H%M%S')
-    return '%s%d' % (current_time, seed)
-
   def _inject_numbers(self, topic, num_messages):
     """Inject numbers as test data to PubSub."""
     logging.debug('Injecting %d numbers to topic %s',
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 7610672..a26ef8a 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -59,7 +59,7 @@ class WordExtractingDoFn(beam.DoFn):
     text_line = element.strip()
     if not text_line:
       self.empty_line_counter.inc(1)
-    words = re.findall(r'[A-Za-z\']+', text_line)
+    words = re.findall(r'[A-Za-z0-9\']+', text_line)
     for w in words:
       self.words_counter.inc()
       self.word_lengths_counter.inc(len(w))
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index 8fb6879..1fb712f 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -72,7 +72,7 @@ class PubSubMessageMatcher(BaseMatcher):
     self.messages = None
 
   def _matches(self, _):
-    if not self.messages:
+    if self.messages is None:
       subscription = (pubsub
                       .Client(project=self.project)
                       .subscription(self.sub_name))
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 50390b9..765ed24 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -48,18 +48,20 @@ class TestDataflowRunner(DataflowRunner):
       # in some cases.
       print('Found: %s.' % self.build_console_url(pipeline.options))
 
-    if not options.view_as(StandardOptions).streaming:
-      self.result.wait_until_finish()
-    else:
-      self.wait_until_in_state(PipelineState.RUNNING)
-
-    if on_success_matcher:
-      from hamcrest import assert_that as hc_assert_that
-      hc_assert_that(self.result, pickler.loads(on_success_matcher))
-
-    if options.view_as(StandardOptions).streaming:
-      self.result.cancel()
-      self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
+    try:
+      if not options.view_as(StandardOptions).streaming:
+        self.result.wait_until_finish()
+      else:
+        self.wait_until_in_state(PipelineState.RUNNING)
+
+      if on_success_matcher:
+        from hamcrest import assert_that as hc_assert_that
+        hc_assert_that(self.result, pickler.loads(on_success_matcher))
+    finally:
+      if not self.result.is_in_terminal_state():
+        self.result.cancel()
+      if options.view_as(StandardOptions).streaming:
+        self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
 
     return self.result
 
@@ -72,7 +74,7 @@ class TestDataflowRunner(DataflowRunner):
         'https://console.cloud.google.com/dataflow/jobsDetail/locations'
         '/%s/jobs/%s?project=%s' % (region_id, job_id, project))
 
-  def wait_until_in_state(self, state, timeout=WAIT_TIMEOUT):
+  def wait_until_in_state(self, expected_state, timeout=WAIT_TIMEOUT):
     """Wait until Dataflow pipeline terminate or enter RUNNING state."""
     if not self.result.has_job:
       raise IOError('Failed to get the Dataflow job id.')
@@ -80,11 +82,11 @@ class TestDataflowRunner(DataflowRunner):
     start_time = time.time()
     while time.time() - start_time <= timeout:
       job_state = self.result.state
-      if (self.result.is_in_terminal_state() or
-          job_state == PipelineState.RUNNING):
+      if self.result.is_in_terminal_state() or job_state == expected_state:
         return job_state
       time.sleep(5)
 
     raise RuntimeError('Timeout after %d seconds while waiting for job %s '
-                       'enters RUNNING or terminate state.' %
-                       (WAIT_TIMEOUT, self.result.job_id))
+                       'enters expected state %s. Current state is %s.' %
+                       (WAIT_TIMEOUT, self.result.job_id,
+                        expected_state, self.result.state))
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 7dc35c4..041ae03 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -71,7 +71,7 @@ python setup.py nosetests \
   --attr IT \
   --nocapture \
   --processes=4 \
-  --process-timeout=900 \
+  --process-timeout=1800 \
   --test-pipeline-options=" \
     --runner=TestDataflowRunner \
     --project=$PROJECT \

-- 
To stop receiving notification emails like this one, please contact
altay@apache.org.