You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/10/17 20:19:33 UTC

[1/2] beam git commit: Wordcount on fnapi pipeline and IT test.

Repository: beam
Updated Branches:
  refs/heads/master 020ef14af -> 0f7736dff


Wordcount on fnapi pipeline and IT test.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3050bcc5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3050bcc5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3050bcc5

Branch: refs/heads/master
Commit: 3050bcc524879a205d7f63f9250d7692d87a5b20
Parents: 020ef14
Author: Ahmet Altay <al...@google.com>
Authored: Fri Oct 13 17:52:36 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Oct 17 13:18:20 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_fnapi.py     | 151 +++++++++++++++++++
 .../apache_beam/examples/wordcount_it_test.py   |  12 ++
 .../apache_beam/runners/worker/sdk_worker.py    |   4 +-
 3 files changed, 165 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3050bcc5/sdks/python/apache_beam/examples/wordcount_fnapi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_fnapi.py b/sdks/python/apache_beam/examples/wordcount_fnapi.py
new file mode 100644
index 0000000..5e92a23
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_fnapi.py
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow using the experimental FnApi.
+
+For the stable wordcount example see wordcount.py.
+"""
+
+# TODO(BEAM-2887): Merge with wordcount.py.
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+# TODO(BEAM-2887): Enable after the issue is fixed.
+# from apache_beam.io import WriteToText
+from apache_beam.metrics import Metrics
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+class WordExtractingDoFn(beam.DoFn):
+  """Parse each line of input text into words."""
+
+  def __init__(self):
+    super(WordExtractingDoFn, self).__init__()
+    self.words_counter = Metrics.counter(self.__class__, 'words')
+    self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
+    self.word_lengths_dist = Metrics.distribution(
+        self.__class__, 'word_len_dist')
+    self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
+
+  def process(self, element):
+    """Returns an iterator over the words of this element.
+
+    The element is a line of text.  If the line is blank, note that, too.
+
+    Args:
+      element: the element being processed
+
+    Returns:
+      The processed element.
+    """
+
+    # TODO(BEAM-3041): Move this import to top of the file after the fix.
+    # Portable containers does not support save main session, and importing here
+    # is required. This is only needed for running experimental jobs with FnApi.
+    import re
+
+    text_line = element.strip()
+    if not text_line:
+      self.empty_line_counter.inc(1)
+    words = re.findall(r'[A-Za-z\']+', text_line)
+    for w in words:
+      self.words_counter.inc()
+      self.word_lengths_counter.inc(len(w))
+      self.word_lengths_dist.update(len(w))
+    return words
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      dest='output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+
+  # Ensure that the experiment flag is set explicitly by the user.
+  debug_options = pipeline_options.view_as(DebugOptions)
+  use_fn_api = (
+      debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
+  assert use_fn_api, 'Enable beam_fn_api experiment, in order run this example.'
+
+  # Read the text file[pattern] into a PCollection.
+  lines = p | 'read' >> ReadFromText(known_args.input)
+
+  # Count the occurrences of each word.
+  def count_ones(word_ones):
+    (word, ones) = word_ones
+    return (word, sum(ones))
+
+  counts = (lines
+            | 'split' >> (beam.ParDo(WordExtractingDoFn())
+                          .with_output_types(unicode))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(count_ones))
+
+  # Format the counts into a PCollection of strings.
+  def format_result(word_count):
+    (word, count) = word_count
+    return '%s: %s' % (word, count)
+
+  # pylint: disable=unused-variable
+  output = counts | 'format' >> beam.Map(format_result)
+
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+
+  # TODO(BEAM-2887): Enable after the issue is fixed.
+  # output | 'write' >> WriteToText(known_args.output)
+
+  result = p.run()
+  result.wait_until_finish()
+
+  # Do not query metrics when creating a template which doesn't run
+  if (not hasattr(result, 'has_job')    # direct runner
+      or result.has_job):               # not just a template creation
+    empty_lines_filter = MetricsFilter().with_name('empty_lines')
+    query_result = result.metrics().query(empty_lines_filter)
+    if query_result['counters']:
+      empty_lines_counter = query_result['counters'][0]
+      logging.info('number of empty lines: %d', empty_lines_counter.committed)
+
+    word_lengths_filter = MetricsFilter().with_name('word_len_dist')
+    query_result = result.metrics().query(word_lengths_filter)
+    if query_result['distributions']:
+      word_lengths_dist = query_result['distributions'][0]
+      logging.info('average word length: %d', word_lengths_dist.committed.mean)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

http://git-wip-us.apache.org/repos/asf/beam/blob/3050bcc5/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 21ff3ce..8532f49 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -25,6 +25,7 @@ from hamcrest.core.core.allof import all_of
 from nose.plugins.attrib import attr
 
 from apache_beam.examples import wordcount
+from apache_beam.examples import wordcount_fnapi
 from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -64,6 +65,17 @@ class WordCountIT(unittest.TestCase):
     # and start pipeline job by calling pipeline main function.
     wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
 
+  @attr('IT')
+  def test_wordcount_fnapi_it(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    wordcount_fnapi.run(
+        test_pipeline.get_full_options_as_args(
+            experiment='beam_fn_api',
+            on_success_matcher=PipelineStateMatcher()))
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)

http://git-wip-us.apache.org/repos/asf/beam/blob/3050bcc5/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d1b0c0e..b08e473 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -45,7 +45,7 @@ class SdkHarness(object):
     self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
 
   def run(self):
-    contol_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel)
+    control_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel)
     # TODO(robertwb): Wire up to new state api.
     state_stub = None
     self.worker = SdkWorker(state_stub, self._data_channel_factory)
@@ -60,7 +60,7 @@ class SdkHarness(object):
           return
         yield response
 
-    for work_request in contol_stub.Control(get_responses()):
+    for work_request in control_stub.Control(get_responses()):
       logging.info('Got work %s', work_request.instruction_id)
       request_type = work_request.WhichOneof('request')
       if request_type == ['process_bundle_progress']:


[2/2] beam git commit: This closes #3982

Posted by al...@apache.org.
This closes #3982


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0f7736df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0f7736df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0f7736df

Branch: refs/heads/master
Commit: 0f7736dff0b0930285f9d9a9ea5415e61633b8be
Parents: 020ef14 3050bcc
Author: Ahmet Altay <al...@google.com>
Authored: Tue Oct 17 13:18:37 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Oct 17 13:18:37 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/wordcount_fnapi.py     | 151 +++++++++++++++++++
 .../apache_beam/examples/wordcount_it_test.py   |  12 ++
 .../apache_beam/runners/worker/sdk_worker.py    |   4 +-
 3 files changed, 165 insertions(+), 2 deletions(-)
----------------------------------------------------------------------