You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/21 01:58:56 UTC
[beam] branch master updated: [BEAM-3861] Improve test infra in
Python SDK for streaming end-to-end test (#4874)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 776fd5a [BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874)
776fd5a is described below
commit 776fd5a6ae21352a20c388ecf23822e6def13854
Author: Mark Liu <ma...@users.noreply.github.com>
AuthorDate: Tue Mar 20 18:58:52 2018 -0700
[BEAM-3861] Improve test infra in Python SDK for streaming end-to-end test (#4874)
Create Python End-to-end Test for Streaming WordCount
---
.../apache_beam/examples/streaming_wordcount.py | 2 +-
.../examples/streaming_wordcount_it_test.py | 102 +++++++++++++++++++++
.../runners/dataflow/test_dataflow_runner.py | 37 +++++++-
sdks/python/apache_beam/testing/test_pipeline.py | 4 +-
sdks/python/apache_beam/testing/test_utils.py | 48 ++++++++++
sdks/python/apache_beam/testing/test_utils_test.py | 55 ++++++++++-
6 files changed, 242 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 12f7351..7ef95d8 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -36,7 +36,7 @@ from apache_beam.options.pipeline_options import StandardOptions
def split_fn(lines):
import re
- return re.findall(r'[A-Za-z\']+', lines)
+ return re.findall(r'[A-Za-z0-9\']+', lines)
def run(argv=None):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
new file mode 100644
index 0000000..a95e5fa
--- /dev/null
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for the streaming wordcount example.
+
+Important: End-to-end test infrastructure for streaming pipeine in Python SDK
+is in development and is not yet available for use.
+
+Currently, this test blocks until the job is manually terminated.
+"""
+
+import logging
+import unittest
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.examples import streaming_wordcount
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'wc_topic_input'
+OUTPUT_TOPIC = 'wc_topic_output'
+INPUT_SUB = 'wc_subscription_input'
+OUTPUT_SUB = 'wc_subscription_output'
+
+DEFAULT_INPUT_NUMBERS = 500
+
+
+class StreamingWordCountIT(unittest.TestCase):
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+
+ # Set up PubSub environment.
+ from google.cloud import pubsub
+ self.pubsub_client = pubsub.Client(
+ project=self.test_pipeline.get_option('project'))
+ self.input_topic = self.pubsub_client.topic(INPUT_TOPIC)
+ self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC)
+ self.input_sub = self.input_topic.subscription(INPUT_SUB)
+ self.output_sub = self.output_topic.subscription(OUTPUT_SUB)
+
+ self._cleanup_pubsub()
+
+ self.input_topic.create()
+ self.output_topic.create()
+ test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+ self.input_sub.create()
+ self.output_sub.create()
+
+ def _inject_numbers(self, topic, num_messages):
+ """Inject numbers as test data to PubSub."""
+ logging.debug('Injecting %d numbers to topic %s',
+ num_messages, topic.full_name)
+ for n in range(num_messages):
+ topic.publish(str(n))
+
+ def _cleanup_pubsub(self):
+ test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
+ test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+ def tearDown(self):
+ self._cleanup_pubsub()
+
+ @attr('developing_test')
+ def test_streaming_wordcount_it(self):
+ # Set extra options to the pipeline for test purpose
+ pipeline_verifiers = [PipelineStateMatcher(PipelineState.RUNNING)]
+ extra_opts = {'input_sub': self.input_sub.full_name,
+ 'output_topic': self.output_topic.full_name,
+ 'on_success_matcher': all_of(*pipeline_verifiers)}
+
+ # Generate input data and inject to PubSub.
+ test_utils.wait_for_subscriptions_created([self.input_sub])
+ self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ streaming_wordcount.run(
+ self.test_pipeline.get_full_options_as_args(**extra_opts))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main()
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index aad3fc7..09a9190 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -18,13 +18,19 @@
"""Wrapper of Beam runners that's built for running and verifying e2e tests."""
from __future__ import print_function
+import time
+
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.runner import PipelineState
__all__ = ['TestDataflowRunner']
+WAIT_TIMEOUT = 2 * 60
+
class TestDataflowRunner(DataflowRunner):
def run_pipeline(self, pipeline):
@@ -46,10 +52,39 @@ class TestDataflowRunner(DataflowRunner):
print (
'Found: https://console.cloud.google.com/dataflow/jobsDetail'
'/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
- self.result.wait_until_finish()
+
+ if not options.view_as(StandardOptions).streaming:
+ self.result.wait_until_finish()
+ else:
+ # TODO: Ideally, we want to wait until workers start successfully.
+ self.wait_until_running()
if on_success_matcher:
from hamcrest import assert_that as hc_assert_that
hc_assert_that(self.result, pickler.loads(on_success_matcher))
return self.result
+
+ def _is_in_terminate_state(self, job_state):
+ return job_state in [
+ PipelineState.STOPPED, PipelineState.DONE,
+ PipelineState.FAILED, PipelineState.CANCELLED,
+ PipelineState.UPDATED, PipelineState.DRAINED,
+ ]
+
+ def wait_until_running(self):
+ """Wait until Dataflow pipeline terminate or enter RUNNING state."""
+ if not self.result.has_job:
+ raise IOError('Failed to get the Dataflow job id.')
+
+ start_time = time.time()
+ while time.time() - start_time <= WAIT_TIMEOUT:
+ job_state = self.result.state
+ if (self._is_in_terminate_state(job_state) or
+ self.result.state == PipelineState.RUNNING):
+ return job_state
+ time.sleep(5)
+
+ raise RuntimeError('Timeout after %d seconds while waiting for job %s '
+ 'enters RUNNING or terminate state.' %
+ (WAIT_TIMEOUT, self.result.job_id))
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 46eeb75..155190c 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -98,8 +98,8 @@ class TestPipeline(Pipeline):
options = PipelineOptions(self.options_list)
super(TestPipeline, self).__init__(runner, options)
- def run(self):
- result = super(TestPipeline, self).run()
+ def run(self, test_runner_api=True):
+ result = super(TestPipeline, self).run(test_runner_api)
if self.blocking:
state = result.wait_until_finish()
assert state == PipelineState.DONE, "Pipeline execution failed."
diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py
index 5676186..f84b7f0 100644
--- a/sdks/python/apache_beam/testing/test_utils.py
+++ b/sdks/python/apache_beam/testing/test_utils.py
@@ -22,9 +22,11 @@ For internal use only; no backwards-compatibility guarantees.
import hashlib
import imp
+import logging
import os
import shutil
import tempfile
+import time
from mock import Mock
from mock import patch
@@ -129,3 +131,49 @@ def delete_files(file_paths):
raise RuntimeError('Clean up failed. Invalid file path: %s.' %
file_paths)
FileSystems.delete(file_paths)
+
+
+def wait_for_subscriptions_created(subs, timeout=60):
+ """Wait for all PubSub subscriptions are created."""
+ return _wait_until_all_exist(subs, timeout)
+
+
+def wait_for_topics_created(topics, timeout=60):
+ """Wait for all PubSub topics are created."""
+ return _wait_until_all_exist(topics, timeout)
+
+
+def _wait_until_all_exist(components, timeout):
+ needs_wait = set(components)
+ start_time = time.time()
+ while time.time() - start_time <= timeout:
+ for c in components:
+ if c in needs_wait and c.exists():
+ needs_wait.remove(c)
+ if len(needs_wait) == 0:
+ return True
+ time.sleep(2)
+
+ raise RuntimeError(
+ 'Timeout after %d seconds. %d of %d topics/subscriptions not exist. '
+ 'They are %s.' %
+ (timeout, len(needs_wait), len(components), list(needs_wait)))
+
+
+def cleanup_subscriptions(subs):
+ """Cleanup PubSub subscriptions if exist."""
+ _cleanup_pubsub(subs)
+
+
+def cleanup_topics(topics):
+ """Cleanup PubSub topics if exist."""
+ _cleanup_pubsub(topics)
+
+
+def _cleanup_pubsub(components):
+ for c in components:
+ if c.exists():
+ c.delete()
+ else:
+ logging.debug('Cannot delete topic/subscription. %s does not exist.',
+ c.full_name)
diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py
index 0018c0e..ba0b940 100644
--- a/sdks/python/apache_beam/testing/test_utils_test.py
+++ b/sdks/python/apache_beam/testing/test_utils_test.py
@@ -22,6 +22,8 @@ import os
import tempfile
import unittest
+import mock
+
from apache_beam.io.filesystem import BeamIOError
from apache_beam.io.filesystems import FileSystems
from apache_beam.testing import test_utils as utils
@@ -57,8 +59,6 @@ class TestUtilsTest(unittest.TestCase):
utils.delete_files([])
def test_temp_dir_removes_files(self):
- dir_path = ''
- file_path = ''
with utils.TempDir() as tempdir:
dir_path = tempdir.get_path()
file_path = tempdir.create_temp_file()
@@ -80,6 +80,57 @@ class TestUtilsTest(unittest.TestCase):
self.assertEqual(f.readline(), 'line2\n')
self.assertEqual(f.readline(), 'line3\n')
+ @mock.patch('time.sleep', return_value=None)
+ def test_wait_for_subscriptions_created_fails(self, patched_time_sleep):
+ sub1 = mock.MagicMock()
+ sub1.exists.return_value = True
+ sub2 = mock.MagicMock()
+ sub2.exists.return_value = False
+ with self.assertRaises(RuntimeError) as error:
+ utils.wait_for_subscriptions_created([sub1, sub2], timeout=0.1)
+ self.assertTrue(sub1.exists.called)
+ self.assertTrue(sub2.exists.called)
+ self.assertTrue(error.exception.args[0].startswith('Timeout after'))
+
+ @mock.patch('time.sleep', return_value=None)
+ def test_wait_for_topics_created_fails(self, patched_time_sleep):
+ topic1 = mock.MagicMock()
+ topic1.exists.return_value = True
+ topic2 = mock.MagicMock()
+ topic2.exists.return_value = False
+ with self.assertRaises(RuntimeError) as error:
+ utils.wait_for_subscriptions_created([topic1, topic2], timeout=0.1)
+ self.assertTrue(topic1.exists.called)
+ self.assertTrue(topic2.exists.called)
+ self.assertTrue(error.exception.args[0].startswith('Timeout after'))
+
+ @mock.patch('time.sleep', return_value=None)
+ def test_wait_for_subscriptions_created_succeeds(self, patched_time_sleep):
+ sub1 = mock.MagicMock()
+ sub1.exists.return_value = True
+ self.assertTrue(
+ utils.wait_for_subscriptions_created([sub1], timeout=0.1))
+
+ @mock.patch('time.sleep', return_value=None)
+ def test_wait_for_topics_created_succeeds(self, patched_time_sleep):
+ topic1 = mock.MagicMock()
+ topic1.exists.return_value = True
+ self.assertTrue(
+ utils.wait_for_subscriptions_created([topic1], timeout=0.1))
+ self.assertTrue(topic1.exists.called)
+
+ def test_cleanup_subscriptions(self):
+ mock_sub = mock.MagicMock()
+ mock_sub.exist.return_value = True
+ utils.cleanup_subscriptions([mock_sub])
+ self.assertTrue(mock_sub.delete.called)
+
+ def test_cleanup_topics(self):
+ mock_topics = mock.MagicMock()
+ mock_topics.exist.return_value = True
+ utils.cleanup_subscriptions([mock_topics])
+ self.assertTrue(mock_topics.delete.called)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
--
To stop receiving notification emails like this one, please contact
altay@apache.org.