You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/01/12 20:15:31 UTC
[1/2] beam git commit: Updates snippets to use Beam text source and
sink.
Repository: beam
Updated Branches:
refs/heads/python-sdk 86d420376 -> 4ba0b60a8
Updates snippets to use Beam text source and sink.
Removes the dependency snippets_test has on dataflow native text sink.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30a01845
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30a01845
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30a01845
Branch: refs/heads/python-sdk
Commit: 30a018458f51a70c0e0d6e5431b219157af8a350
Parents: 86d4203
Author: Chamikara Jayalath <ch...@google.com>
Authored: Wed Jan 11 17:50:02 2017 -0800
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Wed Jan 11 17:50:18 2017 -0800
----------------------------------------------------------------------
.../examples/cookbook/custom_ptransform.py | 4 +-
.../apache_beam/examples/snippets/snippets.py | 100 +++++++++----------
.../examples/snippets/snippets_test.py | 96 ++++++++++++++++--
3 files changed, 136 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index ef6bc5a..cfbb99d 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -48,9 +48,9 @@ def run_count1(known_args, options):
"""Runs the first example pipeline."""
logging.info('Running first pipeline')
p = beam.Pipeline(options=options)
- (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+ (p | beam.io.ReadFromText(known_args.input)
| Count1()
- | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+ | beam.io.WriteToText(known_args.output))
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 0d55125..e467353 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -48,11 +48,15 @@ class SnippetUtils(object):
from apache_beam.pipeline import PipelineVisitor
class RenameFiles(PipelineVisitor):
- """RenameFiles will rewire source and sink for unit testing.
+ """RenameFiles will rewire read/write paths for unit testing.
- RenameFiles will rewire the GCS files specified in the source and
- sink in the snippet pipeline to local files so the pipeline can be run as a
- unit test. This is as close as we can get to have code snippets that are
+ RenameFiles will replace the GCS files specified in the read and
+ write transforms to local files so the pipeline can be run as a
+ unit test. This assumes that read and write transforms defined in snippets
+ have already been replaced by transforms 'DummyReadForTesting' and
+ 'DummyReadForTesting' (see snippets_test.py).
+
+ This is as close as we can get to have code snippets that are
executed and are also ready to presented in webdocs.
"""
@@ -60,14 +64,10 @@ class SnippetUtils(object):
self.renames = renames
def visit_transform(self, transform_node):
- if hasattr(transform_node.transform, 'source'):
- source = transform_node.transform.source
- source.file_path = self.renames['read']
- source.is_gcs_source = False
- elif hasattr(transform_node.transform, 'sink'):
- sink = transform_node.transform.sink
- sink.file_path = self.renames['write']
- sink.is_gcs_sink = False
+ if transform_node.full_label.find('DummyReadForTesting') >= 0:
+ transform_node.transform.fn.file_to_read = self.renames['read']
+ elif transform_node.full_label.find('DummyWriteForTesting') >= 0:
+ transform_node.transform.fn.file_to_write = self.renames['write']
def construct_pipeline(renames):
@@ -94,8 +94,7 @@ def construct_pipeline(renames):
# [END pipelines_constructing_creating]
# [START pipelines_constructing_reading]
- lines = p | beam.io.Read('ReadMyFile',
- beam.io.TextFileSource('gs://some/inputData.txt'))
+ lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
# [END pipelines_constructing_reading]
# [START pipelines_constructing_applying]
@@ -105,8 +104,8 @@ def construct_pipeline(renames):
# [START pipelines_constructing_writing]
filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
- filtered_words | 'WriteMyFile' >> beam.io.Write(
- beam.io.TextFileSink('gs://some/outputData.txt'))
+ filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
+ 'gs://some/outputData.txt')
# [END pipelines_constructing_writing]
p.visit(SnippetUtils.RenameFiles(renames))
@@ -147,10 +146,11 @@ def model_pipelines(argv):
p = beam.Pipeline(options=pipeline_options)
(p
- | beam.io.Read(beam.io.TextFileSource(my_options.input))
+ | beam.io.ReadFromText(my_options.input)
| beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
- | beam.Map(lambda x: (x, 1)) | beam.combiners.Count.PerKey()
- | beam.io.Write(beam.io.TextFileSink(my_options.output)))
+ | beam.Map(lambda x: (x, 1))
+ | beam.combiners.Count.PerKey()
+ | beam.io.WriteToText(my_options.output))
p.run()
# [END model_pipelines]
@@ -184,7 +184,7 @@ def model_pcollection(argv):
'Whether \'tis nobler in the mind to suffer ',
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, '])
- | beam.io.Write(beam.io.TextFileSink(my_options.output)))
+ | beam.io.WriteToText(my_options.output))
p.run()
# [END model_pcollection]
@@ -241,8 +241,8 @@ def pipeline_options_remote(argv):
options.view_as(StandardOptions).runner = 'DirectRunner'
p = Pipeline(options=options)
- lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
- lines | beam.io.Write(beam.io.TextFileSink(my_output))
+ lines = p | beam.io.ReadFromText(my_input)
+ lines | beam.io.WriteToText(my_output)
p.run()
@@ -282,8 +282,8 @@ def pipeline_options_local(argv):
p = Pipeline(options=options)
# [END pipeline_options_local]
- lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
- lines | beam.io.Write(beam.io.TextFileSink(my_output))
+ lines = p | beam.io.ReadFromText(my_input)
+ lines | beam.io.WriteToText(my_output)
p.run()
@@ -304,9 +304,8 @@ def pipeline_options_command_line(argv):
# Create the Pipeline with remaining arguments.
p = beam.Pipeline(argv=pipeline_args)
- lines = p | beam.io.Read('ReadFromText',
- beam.io.TextFileSource(known_args.input))
- lines | beam.io.Write(beam.io.TextFileSink(known_args.output))
+ lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
+ lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
# [END pipeline_options_command_line]
p.run()
@@ -344,7 +343,7 @@ def pipeline_logging(lines, output):
(p
| beam.Create(lines)
| beam.ParDo(ExtractWordsFn())
- | beam.io.Write(beam.io.TextFileSink(output)))
+ | beam.io.WriteToText(output))
p.run()
@@ -404,11 +403,11 @@ def pipeline_monitoring(renames):
# [START pipeline_monitoring_execution]
(p
# Read the lines of the input text.
- | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input))
+ | 'ReadLines' >> beam.io.ReadFromText(options.input)
# Count the words.
| CountWords()
# Write the formatted word counts to output.
- | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output)))
+ | 'WriteCounts' >> beam.io.WriteToText(options.output))
# [END pipeline_monitoring_execution]
p.visit(SnippetUtils.RenameFiles(renames))
@@ -448,8 +447,8 @@ def examples_wordcount_minimal(renames):
(
# [START examples_wordcount_minimal_read]
- p | beam.io.Read(beam.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
+ p | beam.io.ReadFromText(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt')
# [END examples_wordcount_minimal_read]
# [START examples_wordcount_minimal_pardo]
@@ -465,7 +464,7 @@ def examples_wordcount_minimal(renames):
# [END examples_wordcount_minimal_map]
# [START examples_wordcount_minimal_write]
- | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+ | beam.io.WriteToText('gs://my-bucket/counts.txt')
# [END examples_wordcount_minimal_write]
)
@@ -502,8 +501,8 @@ def examples_wordcount_wordcount(renames):
p = beam.Pipeline(options=options)
# [END examples_wordcount_wordcount_options]
- lines = p | beam.io.Read(beam.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
+ lines = p | beam.io.ReadFromText(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt')
# [START examples_wordcount_wordcount_composite]
class CountWords(beam.PTransform):
@@ -530,7 +529,7 @@ def examples_wordcount_wordcount(renames):
formatted = counts | beam.ParDo(FormatAsTextFn())
# [END examples_wordcount_wordcount_dofn]
- formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+ formatted | beam.io.WriteToText('gs://my-bucket/counts.txt')
p.visit(SnippetUtils.RenameFiles(renames))
p.run()
@@ -588,8 +587,8 @@ def examples_wordcount_debugging(renames):
p = beam.Pipeline(options=PipelineOptions())
filtered_words = (
p
- | beam.io.Read(beam.io.TextFileSource(
- 'gs://dataflow-samples/shakespeare/kinglear.txt'))
+ | beam.io.ReadFromText(
+ 'gs://dataflow-samples/shakespeare/kinglear.txt')
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.combiners.Count.PerElement()
| 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
@@ -601,8 +600,7 @@ def examples_wordcount_debugging(renames):
output = (filtered_words
| 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
- | beam.io.Write(
- 'write', beam.io.TextFileSink('gs://my-bucket/counts.txt')))
+ | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
p.visit(SnippetUtils.RenameFiles(renames))
p.run()
@@ -872,18 +870,16 @@ def model_textio(renames):
# [START model_textio_read]
p = beam.Pipeline(options=PipelineOptions())
# [START model_pipelineio_read]
- lines = p | beam.io.Read(
- 'ReadFromText',
- beam.io.TextFileSource('gs://my_bucket/path/to/input-*.csv'))
+ lines = p | 'ReadFromText' >> beam.io.ReadFromText(
+ 'gs://my_bucket/path/to/input-*.csv')
# [END model_pipelineio_read]
# [END model_textio_read]
# [START model_textio_write]
filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
# [START model_pipelineio_write]
- filtered_words | beam.io.Write(
- 'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
- file_name_suffix='.csv'))
+ filtered_words | 'WriteToText' >> beam.io.WriteToText(
+ 'gs://my_bucket/path/to/numbers', file_name_suffix='.csv')
# [END model_pipelineio_write]
# [END model_textio_write]
@@ -1014,7 +1010,7 @@ def model_composite_transform_example(contents, output_path):
(p
| beam.Create(contents)
| CountWords()
- | beam.io.Write(beam.io.TextFileSink(output_path)))
+ | beam.io.WriteToText(output_path))
p.run()
@@ -1050,7 +1046,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())
# [END model_multiple_pcollections_flatten]
- merged | beam.io.Write(beam.io.TextFileSink(output_path))
+ merged | beam.io.WriteToText(output_path)
p.run()
@@ -1083,7 +1079,7 @@ def model_multiple_pcollections_partition(contents, output_path):
([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
| beam.Flatten()
- | beam.io.Write(beam.io.TextFileSink(output_path)))
+ | beam.io.WriteToText(output_path))
p.run()
@@ -1113,7 +1109,7 @@ def model_group_by_key(contents, output_path):
# [END model_group_by_key_transform]
(grouped_words
| 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
- | beam.io.Write(beam.io.TextFileSink(output_path)))
+ | beam.io.WriteToText(output_path))
p.run()
@@ -1151,7 +1147,7 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
contact_lines = result | beam.Map(join_info)
# [END model_group_by_key_cogroupbykey_tuple]
- contact_lines | beam.io.Write(beam.io.TextFileSink(output_path))
+ contact_lines | beam.io.WriteToText(output_path)
p.run()
@@ -1190,7 +1186,7 @@ def model_join_using_side_inputs(
contact_lines = names | beam.core.Map(
"CreateContacts", join_info, AsIter(emails), AsIter(phones))
# [END model_join_using_side_inputs]
- contact_lines | beam.io.Write(beam.io.TextFileSink(output_path))
+ contact_lines | beam.io.WriteToText(output_path)
p.run()
http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 1a84a6e..a43e1e0 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -23,12 +23,12 @@ import os
import sys
import tempfile
import unittest
+import uuid
import apache_beam as beam
-from apache_beam import io
+from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
-from apache_beam.io import fileio
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
from apache_beam.utils.pipeline_options import TypeOptions
@@ -36,9 +36,6 @@ from apache_beam.examples.snippets import snippets
# pylint: disable=expression-not-assigned
-# Monky-patch to use native sink for file path re-writing.
-io.TextFileSink = fileio.NativeTextFileSink
-
class ParDoTest(unittest.TestCase):
"""Tests for dataflow/model/par-do."""
@@ -106,7 +103,8 @@ class ParDoTest(unittest.TestCase):
# pylint: disable=line-too-long
words = ['aa', 'bbc', 'defg']
# [START model_pardo_with_label]
- result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
+ result = words | 'CountUniqueLetters' >> beam.Map(
+ lambda word: len(set(word)))
# [END model_pardo_with_label]
self.assertEqual({1, 2, 4}, set(result))
@@ -350,6 +348,80 @@ class TypeHintsTest(unittest.TestCase):
class SnippetsTest(unittest.TestCase):
+ # Replacing text read/write transforms with dummy transforms for testing.
+ class DummyReadTransform(beam.PTransform):
+ """A transform that will replace iobase.ReadFromText.
+
+ To be used for testing.
+ """
+
+ def __init__(self, file_to_read=None):
+ self.file_to_read = file_to_read
+
+ class ReadDoFn(beam.DoFn):
+
+ def __init__(self, file_to_read):
+ self.file_to_read = file_to_read
+ self.coder = coders.StrUtf8Coder()
+
+ def process(self, context):
+ pass
+
+ def finish_bundle(self, context):
+ assert self.file_to_read
+ for file_name in glob.glob(self.file_to_read):
+ with open(file_name) as file:
+ for record in file:
+ yield self.coder.decode(record.rstrip('\n'))
+
+ def expand(self, pcoll):
+ return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo(
+ SnippetsTest.DummyReadTransform.ReadDoFn(self.file_to_read))
+
+ class DummyWriteTransform(beam.PTransform):
+ """A transform that will replace iobase.WriteToText.
+
+ To be used for testing.
+ """
+
+ def __init__(self, file_to_write=None, file_name_suffix=''):
+ self.file_to_write = file_to_write
+
+ class WriteDoFn(beam.DoFn):
+ def __init__(self, file_to_write):
+ self.file_to_write = file_to_write
+ self.file_obj = None
+ self.coder = coders.ToStringCoder()
+
+ def start_bundle(self, context):
+ assert self.file_to_write
+ self.file_to_write += str(uuid.uuid4())
+ self.file_obj = open(self.file_to_write, 'w')
+
+ def process(self, context):
+ assert self.file_obj
+ self.file_obj.write(self.coder.encode(context.element) + '\n')
+
+ def finish_bundle(self, context):
+ assert self.file_obj
+ self.file_obj.close()
+
+ def expand(self, pcoll):
+ return pcoll | 'DummyWriteForTesting' >> beam.ParDo(
+ SnippetsTest.DummyWriteTransform.WriteDoFn(self.file_to_write))
+
+ def setUp(self):
+ self.old_read_from_text = beam.io.ReadFromText
+ self.old_write_to_text = beam.io.WriteToText
+
+ # Monkey patching to allow testing pipelines defined in snippets.py using
+ # real data.
+ beam.io.ReadFromText = SnippetsTest.DummyReadTransform
+ beam.io.WriteToText = SnippetsTest.DummyWriteTransform
+
+ def tearDown(self):
+ beam.io.ReadFromText = self.old_read_from_text
+ beam.io.WriteToText = self.old_write_to_text
def create_temp_file(self, contents=''):
with tempfile.NamedTemporaryFile(delete=False) as f:
@@ -357,12 +429,16 @@ class SnippetsTest(unittest.TestCase):
return f.name
def get_output(self, path, sorted_output=True, suffix=''):
- with open(path + '-00000-of-00001' + suffix) as f:
- lines = f.readlines()
+ all_lines = []
+ for file_name in glob.glob(path + '*'):
+ with open(file_name) as f:
+ lines = f.readlines()
+ all_lines.extend([s.rstrip('\n') for s in lines])
+
if sorted_output:
- return sorted(s.rstrip('\n') for s in lines)
+ return sorted(s.rstrip('\n') for s in all_lines)
else:
- return [s.rstrip('\n') for s in lines]
+ return all_lines
def test_model_pipelines(self):
temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')
[2/2] beam git commit: Closes #1768
Posted by ro...@apache.org.
Closes #1768
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ba0b60a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ba0b60a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ba0b60a
Branch: refs/heads/python-sdk
Commit: 4ba0b60a8112928c54d62cb45c3bde2ab389361a
Parents: 86d4203 30a0184
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jan 12 12:15:14 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jan 12 12:15:14 2017 -0800
----------------------------------------------------------------------
.../examples/cookbook/custom_ptransform.py | 4 +-
.../apache_beam/examples/snippets/snippets.py | 100 +++++++++----------
.../examples/snippets/snippets_test.py | 96 ++++++++++++++++--
3 files changed, 136 insertions(+), 64 deletions(-)
----------------------------------------------------------------------