You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/31 02:51:05 UTC

[beam] branch master updated: [BEAM-5091] Wordcount integration test on DataflowRunner in Python 3

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e850105  [BEAM-5091] Wordcount integration test on DataflowRunner in Python 3
     new e1ee71a  Merge pull request #7652 from markflyhigh/py3-simple-wc
e850105 is described below

commit e8501057e6135d94f954eab92d13ed989eecb64b
Author: Mark Liu <ma...@google.com>
AuthorDate: Fri Jan 25 16:09:43 2019 -0800

    [BEAM-5091] Wordcount integration test on DataflowRunner in Python 3
---
 .../apache_beam/examples/wordcount_it_test.py      | 68 ++++++++++++++++++++--
 1 file changed, 64 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index d2b7dd1..3b4054a 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -19,20 +19,73 @@
 
 from __future__ import absolute_import
 
+import argparse
 import logging
+import re
 import time
 import unittest
 
 from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
+from past.builtins import unicode
 
+import apache_beam as beam
 from apache_beam.examples import wordcount
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_utils import delete_files
 
 
+def run_wordcount_without_save_main_session(argv):
+  """Defines and runs a simple version of wordcount pipeline.
+
+  This pipeline is the same as wordcount example except replace customized
+  DoFn class with transform function and disable save_main_session option
+  due to BEAM-6158."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      dest='output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
+
+  # Count the occurrences of each word.
+  def count_ones(word_ones):
+    (word, ones) = word_ones
+    return (word, sum(ones))
+
+  # Parse each line of input text into words.
+  def extract_words(line):
+    return re.findall(r'[\w\']+', line, re.UNICODE)
+
+  # Format the counts into a PCollection of strings.
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %d' % (word, count)
+
+  # pylint: disable=expression-not-assigned
+  (p | 'read' >> ReadFromText(known_args.input)
+   | 'split' >> (beam.ParDo(extract_words).with_output_types(unicode))
+   | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+   | 'group' >> beam.GroupByKey()
+   | 'count' >> beam.Map(count_ones)
+   | 'format' >> beam.Map(format_result)
+   | 'write' >> WriteToText(known_args.output))
+
+  result = p.run()
+  result.wait_until_finish()
+
+
 class WordCountIT(unittest.TestCase):
 
   # Enable nose tests running in parallel
@@ -44,13 +97,20 @@ class WordCountIT(unittest.TestCase):
 
   @attr('IT')
   def test_wordcount_it(self):
-    self._run_wordcount_it()
+    self._run_wordcount_it(wordcount.run)
 
   @attr('IT', 'ValidatesContainer')
   def test_wordcount_fnapi_it(self):
-    self._run_wordcount_it(experiment='beam_fn_api')
+    self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
+
+  @attr('Py3IT')
+  # TODO: Delete this test and use test_wordcount_fnapi_it instead
+  # once BEAM-6158 is fixed.
+  def test_wordcount_without_save_main_session(self):
+    self._run_wordcount_it(run_wordcount_without_save_main_session,
+                           experiment='beam_fn_api')
 
-  def _run_wordcount_it(self, **opts):
+  def _run_wordcount_it(self, run_wordcount, **opts):
     test_pipeline = TestPipeline(is_integration_test=True)
 
     # Set extra options to the pipeline for test purpose
@@ -72,7 +132,7 @@ class WordCountIT(unittest.TestCase):
 
     # Get pipeline options from command argument: --test-pipeline-options,
     # and start pipeline job by calling pipeline main function.
-    wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
+    run_wordcount(test_pipeline.get_full_options_as_args(**extra_opts))
 
 
 if __name__ == '__main__':