You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/10/17 20:19:33 UTC
[1/2] beam git commit: Wordcount on fnapi pipeline and IT test.
Repository: beam
Updated Branches:
refs/heads/master 020ef14af -> 0f7736dff
Wordcount on fnapi pipeline and IT test.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3050bcc5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3050bcc5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3050bcc5
Branch: refs/heads/master
Commit: 3050bcc524879a205d7f63f9250d7692d87a5b20
Parents: 020ef14
Author: Ahmet Altay <al...@google.com>
Authored: Fri Oct 13 17:52:36 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Oct 17 13:18:20 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_fnapi.py | 151 +++++++++++++++++++
.../apache_beam/examples/wordcount_it_test.py | 12 ++
.../apache_beam/runners/worker/sdk_worker.py | 4 +-
3 files changed, 165 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3050bcc5/sdks/python/apache_beam/examples/wordcount_fnapi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_fnapi.py b/sdks/python/apache_beam/examples/wordcount_fnapi.py
new file mode 100644
index 0000000..5e92a23
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_fnapi.py
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow using the experimental FnApi.
+
+For the stable wordcount example see wordcount.py.
+"""
+
+# TODO(BEAM-2887): Merge with wordcount.py.
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+# TODO(BEAM-2887): Enable after the issue is fixed.
+# from apache_beam.io import WriteToText
+from apache_beam.metrics import Metrics
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+class WordExtractingDoFn(beam.DoFn):
+ """Parse each line of input text into words."""
+
+ def __init__(self):
+ super(WordExtractingDoFn, self).__init__()
+ self.words_counter = Metrics.counter(self.__class__, 'words')
+ self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
+ self.word_lengths_dist = Metrics.distribution(
+ self.__class__, 'word_len_dist')
+ self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
+
+ def process(self, element):
+ """Returns an iterator over the words of this element.
+
+ The element is a line of text. If the line is blank, note that, too.
+
+ Args:
+ element: the element being processed
+
+ Returns:
+ The processed element.
+ """
+
+ # TODO(BEAM-3041): Move this import to top of the file after the fix.
+ # Portable containers does not support save main session, and importing here
+ # is required. This is only needed for running experimental jobs with FnApi.
+ import re
+
+ text_line = element.strip()
+ if not text_line:
+ self.empty_line_counter.inc(1)
+ words = re.findall(r'[A-Za-z\']+', text_line)
+ for w in words:
+ self.words_counter.inc()
+ self.word_lengths_counter.inc(len(w))
+ self.word_lengths_dist.update(len(w))
+ return words
+
+
+def run(argv=None):
+ """Main entry point; defines and runs the wordcount pipeline."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument('--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ pipeline_options = PipelineOptions(pipeline_args)
+ p = beam.Pipeline(options=pipeline_options)
+
+ # Ensure that the experiment flag is set explicitly by the user.
+ debug_options = pipeline_options.view_as(DebugOptions)
+ use_fn_api = (
+ debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
+ assert use_fn_api, 'Enable beam_fn_api experiment, in order run this example.'
+
+ # Read the text file[pattern] into a PCollection.
+ lines = p | 'read' >> ReadFromText(known_args.input)
+
+ # Count the occurrences of each word.
+ def count_ones(word_ones):
+ (word, ones) = word_ones
+ return (word, sum(ones))
+
+ counts = (lines
+ | 'split' >> (beam.ParDo(WordExtractingDoFn())
+ .with_output_types(unicode))
+ | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+ | 'group' >> beam.GroupByKey()
+ | 'count' >> beam.Map(count_ones))
+
+ # Format the counts into a PCollection of strings.
+ def format_result(word_count):
+ (word, count) = word_count
+ return '%s: %s' % (word, count)
+
+ # pylint: disable=unused-variable
+ output = counts | 'format' >> beam.Map(format_result)
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+
+ # TODO(BEAM-2887): Enable after the issue is fixed.
+ # output | 'write' >> WriteToText(known_args.output)
+
+ result = p.run()
+ result.wait_until_finish()
+
+ # Do not query metrics when creating a template which doesn't run
+ if (not hasattr(result, 'has_job') # direct runner
+ or result.has_job): # not just a template creation
+ empty_lines_filter = MetricsFilter().with_name('empty_lines')
+ query_result = result.metrics().query(empty_lines_filter)
+ if query_result['counters']:
+ empty_lines_counter = query_result['counters'][0]
+ logging.info('number of empty lines: %d', empty_lines_counter.committed)
+
+ word_lengths_filter = MetricsFilter().with_name('word_len_dist')
+ query_result = result.metrics().query(word_lengths_filter)
+ if query_result['distributions']:
+ word_lengths_dist = query_result['distributions'][0]
+ logging.info('average word length: %d', word_lengths_dist.committed.mean)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()
http://git-wip-us.apache.org/repos/asf/beam/blob/3050bcc5/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 21ff3ce..8532f49 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -25,6 +25,7 @@ from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr
from apache_beam.examples import wordcount
+from apache_beam.examples import wordcount_fnapi
from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
@@ -64,6 +65,17 @@ class WordCountIT(unittest.TestCase):
# and start pipeline job by calling pipeline main function.
wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts))
+ @attr('IT')
+ def test_wordcount_fnapi_it(self):
+ test_pipeline = TestPipeline(is_integration_test=True)
+
+ # Get pipeline options from command argument: --test-pipeline-options,
+ # and start pipeline job by calling pipeline main function.
+ wordcount_fnapi.run(
+ test_pipeline.get_full_options_as_args(
+ experiment='beam_fn_api',
+ on_success_matcher=PipelineStateMatcher()))
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
http://git-wip-us.apache.org/repos/asf/beam/blob/3050bcc5/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d1b0c0e..b08e473 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -45,7 +45,7 @@ class SdkHarness(object):
self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1)
def run(self):
- contol_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel)
+ control_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel)
# TODO(robertwb): Wire up to new state api.
state_stub = None
self.worker = SdkWorker(state_stub, self._data_channel_factory)
@@ -60,7 +60,7 @@ class SdkHarness(object):
return
yield response
- for work_request in contol_stub.Control(get_responses()):
+ for work_request in control_stub.Control(get_responses()):
logging.info('Got work %s', work_request.instruction_id)
request_type = work_request.WhichOneof('request')
if request_type == ['process_bundle_progress']:
[2/2] beam git commit: This closes #3982
Posted by al...@apache.org.
This closes #3982
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0f7736df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0f7736df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0f7736df
Branch: refs/heads/master
Commit: 0f7736dff0b0930285f9d9a9ea5415e61633b8be
Parents: 020ef14 3050bcc
Author: Ahmet Altay <al...@google.com>
Authored: Tue Oct 17 13:18:37 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Oct 17 13:18:37 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_fnapi.py | 151 +++++++++++++++++++
.../apache_beam/examples/wordcount_it_test.py | 12 ++
.../apache_beam/runners/worker/sdk_worker.py | 4 +-
3 files changed, 165 insertions(+), 2 deletions(-)
----------------------------------------------------------------------