You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 04:31:12 UTC
[1/2] incubator-beam git commit: Adds a PTransform for Avro source.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk ce29ac7ab -> 71214b37a
Adds a PTransform for Avro source.
Wrapping a custom source as a PTransform is better than directly using the source using df.Read since the PTransform can be extended without breaking end-user code.
Updates the documentation of avroio module.
Adds PTransform wrappers to custom sources and sinks in snippets.py.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/36ea1071
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/36ea1071
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/36ea1071
Branch: refs/heads/python-sdk
Commit: 36ea1071e12a52c4e7a8ded7e5c025f5c735fc65
Parents: ce29ac7
Author: Charles Chen <cc...@google.com>
Authored: Fri Jul 15 14:37:43 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 21:30:37 2016 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 125 ++++++++++++++++---
.../examples/snippets/snippets_test.py | 24 +++-
sdks/python/apache_beam/io/avroio.py | 87 ++++++++-----
sdks/python/apache_beam/io/avroio_test.py | 10 +-
4 files changed, 191 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 57ddbee..3658619 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -612,11 +612,21 @@ def examples_wordcount_debugging(renames):
def model_custom_source(count):
"""Demonstrates creating a new custom source and using it in a pipeline.
- Defines a new source 'CountingSource' that produces integers starting from 0
+ Defines a new source ``CountingSource`` that produces integers starting from 0
up to a given size.
Uses the new source in an example pipeline.
+ Additionally demonstrates how a source should be implemented using a
+ ``PTransform``. This is the recommended way to develop sources that are to
+ distributed to a large number of end users.
+
+ This method runs two pipelines.
+ (1) A pipeline that uses ``CountingSource`` directly using the ``df.Read``
+ transform.
+ (2) A pipeline that uses a custom ``PTransform`` that wraps
+ ``CountingSource``.
+
Args:
count: the size of the counting source to be used in the pipeline
demonstrated in this method.
@@ -625,6 +635,7 @@ def model_custom_source(count):
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker
+ from apache_beam.transforms.core import PTransform
from apache_beam.utils.options import PipelineOptions
# Defining a new source.
@@ -681,34 +692,81 @@ def model_custom_source(count):
p.run()
+ # We recommend users to start Source classes with an underscore to discourage
+ # using the Source class directly when a PTransform for the source is
+ # available. We simulate that here by simply extending the previous Source
+ # class.
+ class _CountingSource(CountingSource):
+ pass
+
+ # [START model_custom_source_new_ptransform]
+ class ReadFromCountingSource(PTransform):
+
+ def __init__(self, label, count, **kwargs):
+ super(ReadFromCountingSource, self).__init__(label, **kwargs)
+ self._count = count
+
+ def apply(self, pcoll):
+ return pcoll | iobase.Read(_CountingSource(count))
+ # [END model_custom_source_new_ptransform]
+
+ # [START model_custom_source_use_ptransform]
+ p = beam.Pipeline(options=PipelineOptions())
+ numbers = p | ReadFromCountingSource('ProduceNumbers', count)
+ # [END model_custom_source_use_ptransform]
+
+ lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
+ beam.assert_that(
+ lines, beam.equal_to(
+ ['line ' + str(number) for number in range(0, count)]))
+
+ p.run()
+
-def model_custom_sink(simplekv, KVs, final_table_name):
+def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
+ final_table_name_with_ptransform):
"""Demonstrates creating a new custom sink and using it in a pipeline.
- Defines a new sink 'SimpleKVSink' that demonstrates writing to a simple
- key-value based storage system.
+ Defines a new sink ``SimpleKVSink`` that demonstrates writing to a simple
+ key-value based storage system which has following API.
+
+ simplekv.connect(url) -
+ connects to the storage system and returns an access token which can be
+ used to perform further operations
+ simplekv.open_table(access_token, table_name) -
+ creates a table named 'table_name'. Returns a table object.
+ simplekv.write_to_table(access_token, table, key, value) -
+ writes a key-value pair to the given table.
+ simplekv.rename_table(access_token, old_name, new_name) -
+ renames the table named 'old_name' to 'new_name'.
Uses the new sink in an example pipeline.
+ Additionally demonstrates how a sink should be implemented using a
+ ``PTransform``. This is the recommended way to develop sinks that are to be
+ distributed to a large number of end users.
+
+ This method runs two pipelines.
+ (1) A pipeline that uses ``SimpleKVSink`` directly using the ``df.Write``
+ transform.
+ (2) A pipeline that uses a custom ``PTransform`` that wraps
+ ``SimpleKVSink``.
+
Args:
- simplekv: an object that mocks the key-value storage. The API of the
- key-value storage consists of following methods.
- simplekv.connect(url) -
- connects to the storage and returns an access token
- which can be used to perform further operations
- simplekv.open_table(access_token, table_name) -
- creates a table named 'table_name'. Returns a table object.
- simplekv.write_to_table(table, access_token, key, value) -
- writes a key-value pair to the given table.
- simplekv.rename_table(access_token, old_name, new_name) -
- renames the table named 'old_name' to 'new_name'.
+ simplekv: an object that mocks the key-value storage.
KVs: the set of key-value pairs to be written in the example pipeline.
- final_table_name: the prefix of final set of tables to be created by the
- example pipeline.
+ final_table_name_no_ptransform: the prefix of final set of tables to be
+ created by the example pipeline that uses
+ ``SimpleKVSink`` directly.
+ final_table_name_with_ptransform: the prefix of final set of tables to be
+ created by the example pipeline that uses
+ a ``PTransform`` that wraps
+ ``SimpleKVSink``.
"""
import apache_beam as beam
from apache_beam.io import iobase
+ from apache_beam.transforms.core import PTransform
from apache_beam.utils.options import PipelineOptions
# Defining the new sink.
@@ -751,6 +809,8 @@ def model_custom_sink(simplekv, KVs, final_table_name):
return self._table_name
# [END model_custom_sink_new_writer]
+ final_table_name = final_table_name_no_ptransform
+
# Using the new sink in an example pipeline.
# [START model_custom_sink_use_new_sink]
p = beam.Pipeline(options=PipelineOptions())
@@ -764,6 +824,37 @@ def model_custom_sink(simplekv, KVs, final_table_name):
p.run()
+ # We recommend users to start Sink class names with an underscore to
+ # discourage using the Sink class directly when a PTransform for the sink is
+ # available. We simulate that here by simply extending the previous Sink
+ # class.
+ class _SimpleKVSink(SimpleKVSink):
+ pass
+
+ # [START model_custom_sink_new_ptransform]
+ class WriteToKVSink(PTransform):
+
+ def __init__(self, label, url, final_table_name, **kwargs):
+ super(WriteToKVSink, self).__init__(label, **kwargs)
+ self._url = url
+ self._final_table_name = final_table_name
+
+ def apply(self, pcoll):
+ return pcoll | iobase.Write(_SimpleKVSink(self._url,
+ self._final_table_name))
+ # [END model_custom_sink_new_ptransform]
+
+ final_table_name = final_table_name_with_ptransform
+
+ # [START model_custom_sink_use_ptransform]
+ p = beam.Pipeline(options=PipelineOptions())
+ kvs = p | beam.core.Create('CreateKVs', KVs)
+ kvs | WriteToKVSink('WriteToSimpleKV',
+ 'http://url_to_simple_kv/', final_table_name)
+ # [END model_custom_sink_use_ptransform]
+
+ p.run()
+
def model_textio(renames):
"""Using a Read and Write transform to read/write text files.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index eaf1b3a..7888263 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -393,6 +393,7 @@ class SnippetsTest(unittest.TestCase):
tempdir_name = tempfile.mkdtemp()
class SimpleKV(object):
+
def __init__(self, tmp_dir):
self._dummy_token = 'dummy_token'
self._tmp_dir = tmp_dir
@@ -426,19 +427,32 @@ class SnippetsTest(unittest.TestCase):
snippets.model_custom_sink(
SimpleKV(tempdir_name),
[('key' + str(i), 'value' + str(i)) for i in range(100)],
- 'final_table')
+ 'final_table_no_ptransform', 'final_table_with_ptransform')
+
+ expected_output = [
+ 'key' + str(i) + ':' + 'value' + str(i) for i in range(100)]
- glob_pattern = tempdir_name + os.sep + 'final_table*'
+ glob_pattern = tempdir_name + os.sep + 'final_table_no_ptransform*'
output_files = glob.glob(glob_pattern)
- assert len(output_files) > 0
+ assert output_files
+
+ received_output = []
+ for file_name in output_files:
+ with open(file_name) as f:
+ for line in f:
+ received_output.append(line.rstrip(os.linesep))
+
+ self.assertItemsEqual(expected_output, received_output)
+
+ glob_pattern = tempdir_name + os.sep + 'final_table_with_ptransform*'
+ output_files = glob.glob(glob_pattern)
+ assert output_files
received_output = []
for file_name in output_files:
with open(file_name) as f:
for line in f:
received_output.append(line.rstrip(os.linesep))
- expected_output = [
- 'key' + str(i) + ':' + 'value' + str(i) for i in range(100)]
self.assertItemsEqual(expected_output, received_output)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 25412af..4d1b245 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -26,44 +26,71 @@ from avro import io as avro_io
from avro import schema
from apache_beam.io import filebasedsource
+from apache_beam.io.iobase import Read
+from apache_beam.transforms import PTransform
+
+
+class ReadFromAvro(PTransform):
+ """A ``PTransform`` for reading avro files."""
+
+ def __init__(self, label=None, file_pattern=None, min_bundle_size=0,
+ **kwargs):
+ """Initializes ``ReadFromAvro``.
+
+ Uses source '_AvroSource' to read a set of Avro files defined by a given
+ file pattern.
+ If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
+ files, a ``PCollection`` for the records in these Avro files can be created
+ in the following manner.
+ p = df.Pipeline(argv=pipeline_args)
+ records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*')
+
+ Each record of this ``PCollection`` will contain a single record read from a
+ source. Records that are of simple types will be mapped into corresponding
+ Python types. Records that are of Avro type 'RECORD' will be mapped to
+ Python dictionaries that comply with the schema contained in the Avro file
+ that contains those records. In this case, keys of each dictionary
+ will contain the corresponding field names and will be of type ``string``
+ while the values of the dictionary will be of the type defined in the
+ corresponding Avro schema.
+ For example, if schema of the Avro file is the following.
+ {"namespace": "example.avro","type": "record","name": "User","fields":
+ [{"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]}]}
+ Then records generated by ``AvroSource`` will be dictionaries of the
+ following form.
+ {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
+ Args:
+ label: label of the PTransform.
+ file_pattern: the set of files to be read.
+ min_bundle_size: the minimum size in bytes, to be considered when
+ splitting the input into bundles.
+ **kwargs: Additional keyword arguments to be passed to the base class.
+ """
+ super(ReadFromAvro, self).__init__(label, **kwargs)
+
+ self._file_pattern = file_pattern
+ self._min_bundle_size = min_bundle_size
-class AvroSource(filebasedsource.FileBasedSource):
+ def apply(self, pcoll):
+ return pcoll.pipeline | Read(_AvroSource(
+ file_pattern=self._file_pattern, min_bundle_size=self._min_bundle_size))
+
+
+class _AvroSource(filebasedsource.FileBasedSource):
"""A source for reading Avro files.
- ``AvroSource`` is implemented using the file-based source framework available
+ ``_AvroSource`` is implemented using the file-based source framework available
in module 'filebasedsource'. Hence please refer to module 'filebasedsource'
to fully understand how this source implements operations common to all
file-based sources such as file-pattern expansion and splitting into bundles
for parallel processing.
-
- If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
- files, a ``PCollection`` for the records in these Avro files can be created in
- the following manner.
-
- p = df.Pipeline(argv=pipeline_args)
- records = p | df.io.Read('Read', avroio.AvroSource('/mypath/myavrofiles*'))
-
- Each record of this ``PCollection`` will contain a Python dictionary that
- complies with the schema contained in the Avro file that contains that
- particular record.
- Keys of each dictionary will contain the corresponding field names and will
- be of type ``string``. Values of the dictionary will be of the type defined in
- the corresponding Avro schema.
-
- For example, if schema of the Avro file is following.
- {"namespace": "example.avro","type": "record","name": "User","fields":
- [{"name": "name", "type": "string"},
- {"name": "favorite_number", "type": ["int", "null"]},
- {"name": "favorite_color", "type": ["string", "null"]}]}
-
- Then records generated by ``AvroSource`` will be dictionaries of the following
- form.
- {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
"""
def __init__(self, file_pattern, min_bundle_size=0):
- super(AvroSource, self).__init__(file_pattern, min_bundle_size)
+ super(_AvroSource, self).__init__(file_pattern, min_bundle_size)
self._avro_schema = None
self._codec = None
self._sync_marker = None
@@ -163,9 +190,9 @@ class AvroSource(filebasedsource.FileBasedSource):
block_size = decoder.read_long()
block_bytes = decoder.read(block_size)
- return AvroSource.AvroBlock(block_bytes, num_records,
- self._avro_schema,
- self._codec, f.tell()) if block_bytes else None
+ return _AvroSource.AvroBlock(block_bytes, num_records,
+ self._avro_schema,
+ self._codec, f.tell()) if block_bytes else None
class AvroUtils(object):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 060e211..51bc375 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -19,9 +19,13 @@ import logging
import os
import tempfile
import unittest
-from apache_beam.io import avroio
+
from apache_beam.io import filebasedsource
from apache_beam.io import source_test_utils
+
+# Importing following private class for testing purposes.
+from apache_beam.io.avroio import _AvroSource as AvroSource
+
import avro.datafile
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
@@ -81,7 +85,7 @@ class TestAvro(unittest.TestCase):
def _run_avro_test(
self, pattern, desired_bundle_size, perform_splitting, expected_result):
- source = avroio.AvroSource(pattern)
+ source = AvroSource(pattern)
read_records = []
if perform_splitting:
@@ -148,7 +152,7 @@ class TestAvro(unittest.TestCase):
try:
avro.datafile.SYNC_INTERVAL = 5
file_name = self._write_data(count=20)
- source = avroio.AvroSource(file_name)
+ source = AvroSource(file_name)
splits = [split for split in source.split(
desired_bundle_size=float('inf'))]
assert len(splits) == 1
[2/2] incubator-beam git commit: Closes #672
Posted by dh...@apache.org.
Closes #672
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/71214b37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/71214b37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/71214b37
Branch: refs/heads/python-sdk
Commit: 71214b37ac681f01580c22301d4b3ed6191b6e23
Parents: ce29ac7 36ea107
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 19 21:30:38 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 21:30:38 2016 -0700
----------------------------------------------------------------------
.../apache_beam/examples/snippets/snippets.py | 125 ++++++++++++++++---
.../examples/snippets/snippets_test.py | 24 +++-
sdks/python/apache_beam/io/avroio.py | 87 ++++++++-----
sdks/python/apache_beam/io/avroio_test.py | 10 +-
4 files changed, 191 insertions(+), 55 deletions(-)
----------------------------------------------------------------------