You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/21 01:58:56 UTC

[beam] branch master updated: [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 776fd5a  [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874)
776fd5a is described below

commit 776fd5a6ae21352a20c388ecf23822e6def13854
Author: Mark Liu <ma...@users.noreply.github.com>
AuthorDate: Tue Mar 20 18:58:52 2018 -0700

    [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874)
    
    Create Python End-to-end Test for Streaming WordCount
---
 .../apache_beam/examples/streaming_wordcount.py    |   2 +-
 .../examples/streaming_wordcount_it_test.py        | 102 +++++++++++++++++++++
 .../runners/dataflow/test_dataflow_runner.py       |  37 +++++++-
 sdks/python/apache_beam/testing/test_pipeline.py   |   4 +-
 sdks/python/apache_beam/testing/test_utils.py      |  48 ++++++++++
 sdks/python/apache_beam/testing/test_utils_test.py |  55 ++++++++++-
 6 files changed, 242 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 12f7351..7ef95d8 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -36,7 +36,7 @@ from apache_beam.options.pipeline_options import StandardOptions
 
 def split_fn(lines):
   import re
-  return re.findall(r'[A-Za-z\']+', lines)
+  return re.findall(r'[A-Za-z0-9\']+', lines)
 
 
 def run(argv=None):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
new file mode 100644
index 0000000..a95e5fa
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test blocks until the job is manually terminated.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+
+    # Set up PubSub environment.
+    from google.cloud import pubsub
+    self.pubsub_client = pubsub.Client(
+        project=self.test_pipeline.get_option('project'))
+    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+    self.input_sub = self.input_topic.subscription(INPUT_SUB)
+    self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+    self._cleanup_pubsub()
+
+    self.input_topic.create()
+    self.output_topic.create()
+    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+    self.input_sub.create()
+    self.output_sub.create()
+
+  def _inject_numbers(self, topic, num_messages):
+    """Inject numbers as test data to PubSub."""
+    logging.debug('Injecting %d numbers to topic %s',
+                  num_messages, topic.full_name)
+    for n in range(num_messages):
+      topic.publish(str(n))
+
+  def _cleanup_pubsub(self):
+    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
+    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def tearDown(self):
+    self._cleanup_pubsub()
+
+  @attr('developing_test')
+  def test_streaming_wordcount_it(self):
+    # Set extra options to the pipeline for test purpose
+    pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+    extra_opts = {'input_sub': self.input_sub.full_name,
+                  'output_topic': self.output_topic.full_name,
+                  'on_success_matcher': all_of(*pipeline_verifiers)}
+
+    # Generate input data and inject to PubSub.
+    test_utils.wait_for_subscriptions_created([self.input_sub])
+    self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    streaming_wordcount.run(
+        self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index aad3fc7..09a9190 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -18,13 +18,19 @@
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
 from __future__ import print_function
 
+import time
+
 from apache_beam.internal import pickler
 from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.runner import PipelineState
 
 __all__ = ['TestDataflowRunner']
 
+WAIT_TIMEOUT = 2 * 60
+
 
 class TestDataflowRunner(DataflowRunner):
   def run_pipeline(self, pipeline):
@@ -46,10 +52,39 @@ class TestDataflowRunner(DataflowRunner):
       print (
           'Found: https://console.cloud.google.com/dataflow/jobsDetail'
           '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
-    self.result.wait_until_finish()
+
+    if not options.view_as(StandardOptions).streaming:
+      self.result.wait_until_finish()
+    else:
+      # TODO: Ideally, we want to wait until workers start successfully.
+      self.wait_until_running()
 
     if on_success_matcher:
       from hamcrest import assert_that as hc_assert_that
       hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
     return self.result
+
+  def _is_in_terminate_state(self, job_state):
+    return job_state in [
+        PipelineState.STOPPED, PipelineState.DONE,
+        PipelineState.FAILED, PipelineState.CANCELLED,
+        PipelineState.UPDATED, PipelineState.DRAINED,
+    ]
+
+  def wait_until_running(self):
+    """Wait until Dataflow pipeline terminate or enter RUNNING state."""
+    if not self.result.has_job:
+      raise IOError('Failed to get the Dataflow job id.')
+
+    start_time = time.time()
+    while time.time() - start_time <= WAIT_TIMEOUT:
+      job_state = self.result.state
+      if (self._is_in_terminate_state(job_state) or
+          self.result.state == PipelineState.RUNNING):
+        return job_state
+      time.sleep(5)
+
+    raise RuntimeError('Timeout after %d seconds while waiting for job %s '
+                       'enters RUNNING or terminate state.' %
+                       (WAIT_TIMEOUT, self.result.job_id))
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 46eeb75..155190c 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -98,8 +98,8 @@ class TestPipeline(Pipeline):
       options = PipelineOptions(self.options_list)
     super(TestPipeline, self).__init__(runner, options)
 
-  def run(self):
-    result = super(TestPipeline, self).run()
+  def run(self, test_runner_api=True):
+    result = super(TestPipeline, self).run(test_runner_api)
     if self.blocking:
       state = result.wait_until_finish()
       assert state == PipelineState.DONE, "Pipeline execution failed."
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 5676186..f84b7f0 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -22,9 +22,11 @@ For internal use only; no backwards-compatibility guarantees.
 
 import hashlib
 import imp
+import logging
 import os
 import shutil
 import tempfile
+import time
 
 from mock import Mock
 from mock import patch
@@ -129,3 +131,49 @@ def delete_files(file_paths):
     raise RuntimeError('Clean up failed. Invalid file path: %s.' %
                        file_paths)
   FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+  """Wait for all PubSub subscriptions are created."""
+  return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+  """Wait for all PubSub topics are created."""
+  return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+  needs_wait = set(components)
+  start_time = time.time()
+  while time.time() - start_time <= timeout:
+    for c in components:
+      if c in needs_wait and c.exists():
+        needs_wait.remove(c)
+    if len(needs_wait) == 0:
+      return True
+    time.sleep(2)
+
+  raise RuntimeError(
+      'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
+      'They are %s.' %
+      (timeout, len(needs_wait), len(components), list(needs_wait)))
+
+
+def cleanup_subscriptions(subs):
+  """Cleanup PubSub subscriptions if exist."""
+  _cleanup_pubsub(subs)
+
+
+def cleanup_topics(topics):
+  """Cleanup PubSub topics if exist."""
+  _cleanup_pubsub(topics)
+
+
+def _cleanup_pubsub(components):
+  for c in components:
+    if c.exists():
+      c.delete()
+    else:
+      logging.debug('Cannot delete topic/subscription. %s does not exist.',
+                    c.full_name)
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py
index 0018c0e..ba0b940 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -22,6 +22,8 @@ import os
 import tempfile
 import unittest
 
+import mock
+
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.testing import test_utils as utils
@@ -57,8 +59,6 @@ class TestUtilsTest(unittest.TestCase):
       utils.delete_files([])
 
   def test_temp_dir_removes_files(self):
-    dir_path = ''
-    file_path = ''
     with utils.TempDir() as tempdir:
       dir_path = tempdir.get_path()
       file_path = tempdir.create_temp_file()
@@ -80,6 +80,57 @@ class TestUtilsTest(unittest.TestCase):
         self.assertEqual(f.readline(), 'line2\n')
         self.assertEqual(f.readline(), 'line3\n')
 
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
+    sub1 = mock.MagicMock()
+    sub1.exists.return_value = True
+    sub2 = mock.MagicMock()
+    sub2.exists.return_value = False
+    with self.assertRaises(RuntimeError) as error:
+      utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
+    self.assertTrue(sub1.exists.called)
+    self.assertTrue(sub2.exists.called)
+    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_topics_created_fails(self, patched_time_sleep):
+    topic1 = mock.MagicMock()
+    topic1.exists.return_value = True
+    topic2 = mock.MagicMock()
+    topic2.exists.return_value = False
+    with self.assertRaises(RuntimeError) as error:
+      utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
+    self.assertTrue(topic1.exists.called)
+    self.assertTrue(topic2.exists.called)
+    self.assertTrue(error.exception.args[0].startswith('Timeout after'))
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
+    sub1 = mock.MagicMock()
+    sub1.exists.return_value = True
+    self.assertTrue(
+        utils.wait_for_subscriptions_created([sub1], timeout=0.1))
+
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
+    topic1 = mock.MagicMock()
+    topic1.exists.return_value = True
+    self.assertTrue(
+        utils.wait_for_subscriptions_created([topic1], timeout=0.1))
+    self.assertTrue(topic1.exists.called)
+
+  def test_cleanup_subscriptions(self):
+    mock_sub = mock.MagicMock()
+    mock_sub.exist.return_value = True
+    utils.cleanup_subscriptions([mock_sub])
+    self.assertTrue(mock_sub.delete.called)
+
+  def test_cleanup_topics(self):
+    mock_topics = mock.MagicMock()
+    mock_topics.exist.return_value = True
+    utils.cleanup_subscriptions([mock_topics])
+    self.assertTrue(mock_topics.delete.called)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

-- 
To stop receiving notification emails like this one, please contact
altay@apache.org.