You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/31 02:51:05 UTC
[beam] branch master updated: [BEAM-5091] Wordcount integration
test on DataflowRunner in Python 3
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e850105 [BEAM-5091] Wordcount integration test on DataflowRunner in Python 3
new e1ee71a Merge pull request #7652 from markflyhigh/py3-simple-wc
e850105 is described below
commit e8501057e6135d94f954eab92d13ed989eecb64b
Author: Mark Liu <ma...@google.com>
AuthorDate: Fri Jan 25 16:09:43 2019 -0800
[BEAM-5091] Wordcount integration test on DataflowRunner in Python 3
---
.../apache_beam/examples/wordcount_it_test.py | 68 ++++++++++++++++++++--
1 file changed, 64 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index d2b7dd1..3b4054a 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -19,20 +19,73 @@
from __future__ import absolute_import
+import argparse
import logging
+import re
import time
import unittest
from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
+from past.builtins import unicode
+import apache_beam as beam
from apache_beam.examples import wordcount
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_utils import delete_files
+def run_wordcount_without_save_main_session(argv):
+ """Defines and runs a simple version of wordcount pipeline.
+
+ This pipeline is the same as wordcount example except replace customized
+ DoFn class with transform function and disable save_main_session option
+ due to BEAM-6158."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ p = beam.Pipeline(options=PipelineOptions(pipeline_args))
+
+ # Count the occurrences of each word.
+ def count_ones(word_ones):
+ (word, ones) = word_ones
+ return (word, sum(ones))
+
+ # Parse each line of input text into words.
+ def extract_words(line):
+ return re.findall(r'[\w\']+', line, re.UNICODE)
+
+ # Format the counts into a PCollection of strings.
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %d' % (word, count)
+
+ # pylint: disable=expression-not-assigned
+ (p | 'read' >> ReadFromText(known_args.input)
+ | 'split' >> (beam.ParDo(extract_words).with_output_types(unicode))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(count_ones)
+ | 'format' >> beam.Map(format_result)
+ | 'write' >> WriteToText(known_args.output))
+
+ result = p.run()
+ result.wait_until_finish()
+
+
class WordCountIT(unittest.TestCase):
# Enable nose tests running in parallel
@@ -44,13 +97,20 @@ class WordCountIT(unittest.TestCase):
@attr('IT')
def test_wordcount_it(self):
- self._run_wordcount_it()
+ self._run_wordcount_it(wordcount.run)
@attr('IT', 'ValidatesContainer')
def test_wordcount_fnapi_it(self):
- self._run_wordcount_it(experiment='beam_fn_api')
+ self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
+
+ @attr('Py3IT')
+ # TODO: Delete this test and use test_wordcount_fnapi_it instead
+ # once BEAM-6158 is fixed.
+ def test_wordcount_without_save_main_session(self):
+ self._run_wordcount_it(run_wordcount_without_save_main_session,
+ experiment='beam_fn_api')
- def _run_wordcount_it(self, **opts):
+ def _run_wordcount_it(self, run_wordcount, **opts):
test_pipeline = TestPipeline(is_integration_test=True)
# Set extra options to the pipeline for test purpose
@@ -72,7 +132,7 @@ class WordCountIT(unittest.TestCase):
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
- wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
+ run_wordcount(test_pipeline.get_full_options_as_args(**extra_opts))
if __name__ == '__main__':