You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 04:31:12 UTC

[1/2] incubator-beam git commit: Adds a PTransform for Avro source.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk ce29ac7ab -> 71214b37a


Adds a PTransform for Avro source.

Wrapping a custom source as a PTransform is better than directly using the source using df.Read since the PTransform can be extended without breaking end-user code.

Updates the documentation of avroio module.

Adds PTransform wrappers to custom sources and sinks in snippets.py.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/36ea1071
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/36ea1071
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/36ea1071

Branch: refs/heads/python-sdk
Commit: 36ea1071e12a52c4e7a8ded7e5c025f5c735fc65
Parents: ce29ac7
Author: Charles Chen <cc...@google.com>
Authored: Fri Jul 15 14:37:43 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 21:30:37 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 125 ++++++++++++++++---
 .../examples/snippets/snippets_test.py          |  24 +++-
 sdks/python/apache_beam/io/avroio.py            |  87 ++++++++-----
 sdks/python/apache_beam/io/avroio_test.py       |  10 +-
 4 files changed, 191 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 57ddbee..3658619 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -612,11 +612,21 @@ def examples_wordcount_debugging(renames):
 def model_custom_source(count):
   """Demonstrates creating a new custom source and using it in a pipeline.
 
-  Defines a new source 'CountingSource' that produces integers starting from 0
+  Defines a new source ``CountingSource`` that produces integers starting from 0
   up to a given size.
 
   Uses the new source in an example pipeline.
 
+  Additionally demonstrates how a source should be implemented using a
+  ``PTransform``. This is the recommended way to develop sources that are to
+  distributed to a large number of end users.
+
+  This method runs two pipelines.
+  (1) A pipeline that uses ``CountingSource`` directly using the ``df.Read``
+      transform.
+  (2) A pipeline that uses a custom ``PTransform`` that wraps
+      ``CountingSource``.
+
   Args:
     count: the size of the counting source to be used in the pipeline
            demonstrated in this method.
@@ -625,6 +635,7 @@ def model_custom_source(count):
   import apache_beam as beam
   from apache_beam.io import iobase
   from apache_beam.io.range_trackers import OffsetRangeTracker
+  from apache_beam.transforms.core import PTransform
   from apache_beam.utils.options import PipelineOptions
 
   # Defining a new source.
@@ -681,34 +692,81 @@ def model_custom_source(count):
 
   p.run()
 
+  # We recommend users to start Source classes with an underscore to discourage
+  # using the Source class directly when a PTransform for the source is
+  # available. We simulate that here by simply extending the previous Source
+  # class.
+  class _CountingSource(CountingSource):
+    pass
+
+  # [START model_custom_source_new_ptransform]
+  class ReadFromCountingSource(PTransform):
+
+    def __init__(self, label, count, **kwargs):
+      super(ReadFromCountingSource, self).__init__(label, **kwargs)
+      self._count = count
+
+    def apply(self, pcoll):
+      return pcoll | iobase.Read(_CountingSource(count))
+  # [END model_custom_source_new_ptransform]
+
+  # [START model_custom_source_use_ptransform]
+  p = beam.Pipeline(options=PipelineOptions())
+  numbers = p | ReadFromCountingSource('ProduceNumbers', count)
+  # [END model_custom_source_use_ptransform]
+
+  lines = numbers | beam.core.Map(lambda number: 'line %d' % number)
+  beam.assert_that(
+      lines, beam.equal_to(
+          ['line ' + str(number) for number in range(0, count)]))
+
+  p.run()
+
 
-def model_custom_sink(simplekv, KVs, final_table_name):
+def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
+                      final_table_name_with_ptransform):
   """Demonstrates creating a new custom sink and using it in a pipeline.
 
-  Defines a new sink 'SimpleKVSink' that demonstrates writing to a simple
-  key-value based storage system.
+  Defines a new sink ``SimpleKVSink`` that demonstrates writing to a simple
+  key-value based storage system which has following API.
+
+    simplekv.connect(url) -
+        connects to the storage system and returns an access token which can be
+        used to perform further operations
+    simplekv.open_table(access_token, table_name) -
+        creates a table named 'table_name'. Returns a table object.
+    simplekv.write_to_table(access_token, table, key, value) -
+        writes a key-value pair to the given table.
+    simplekv.rename_table(access_token, old_name, new_name) -
+        renames the table named 'old_name' to 'new_name'.
 
   Uses the new sink in an example pipeline.
 
+  Additionally demonstrates how a sink should be implemented using a
+  ``PTransform``. This is the recommended way to develop sinks that are to be
+  distributed to a large number of end users.
+
+  This method runs two pipelines.
+  (1) A pipeline that uses ``SimpleKVSink`` directly using the ``df.Write``
+      transform.
+  (2) A pipeline that uses a custom ``PTransform`` that wraps
+      ``SimpleKVSink``.
+
   Args:
-    simplekv: an object that mocks the key-value storage. The API of the
-              key-value storage consists of following methods.
-              simplekv.connect(url) -
-                  connects to the storage and returns an access token
-                  which can be used to perform further operations
-              simplekv.open_table(access_token, table_name) -
-                  creates a table named 'table_name'. Returns a table object.
-              simplekv.write_to_table(table, access_token, key, value) -
-                  writes a key-value pair to the given table.
-              simplekv.rename_table(access_token, old_name, new_name) -
-                  renames the table named 'old_name' to 'new_name'.
+    simplekv: an object that mocks the key-value storage.
     KVs: the set of key-value pairs to be written in the example pipeline.
-    final_table_name: the prefix of final set of tables to be created by the
-                      example pipeline.
+    final_table_name_no_ptransform: the prefix of final set of tables to be
+                                    created by the example pipeline that uses
+                                    ``SimpleKVSink`` directly.
+    final_table_name_with_ptransform: the prefix of final set of tables to be
+                                      created by the example pipeline that uses
+                                      a ``PTransform`` that wraps
+                                      ``SimpleKVSink``.
   """
 
   import apache_beam as beam
   from apache_beam.io import iobase
+  from apache_beam.transforms.core import PTransform
   from apache_beam.utils.options import PipelineOptions
 
   # Defining the new sink.
@@ -751,6 +809,8 @@ def model_custom_sink(simplekv, KVs, final_table_name):
       return self._table_name
   # [END model_custom_sink_new_writer]
 
+  final_table_name = final_table_name_no_ptransform
+
   # Using the new sink in an example pipeline.
   # [START model_custom_sink_use_new_sink]
   p = beam.Pipeline(options=PipelineOptions())
@@ -764,6 +824,37 @@ def model_custom_sink(simplekv, KVs, final_table_name):
 
   p.run()
 
+  # We recommend users to start Sink class names with an underscore to
+  # discourage using the Sink class directly when a PTransform for the sink is
+  # available. We simulate that here by simply extending the previous Sink
+  # class.
+  class _SimpleKVSink(SimpleKVSink):
+    pass
+
+  # [START model_custom_sink_new_ptransform]
+  class WriteToKVSink(PTransform):
+
+    def __init__(self, label, url, final_table_name, **kwargs):
+      super(WriteToKVSink, self).__init__(label, **kwargs)
+      self._url = url
+      self._final_table_name = final_table_name
+
+    def apply(self, pcoll):
+      return pcoll | iobase.Write(_SimpleKVSink(self._url,
+                                                self._final_table_name))
+  # [END model_custom_sink_new_ptransform]
+
+  final_table_name = final_table_name_with_ptransform
+
+  # [START model_custom_sink_use_ptransform]
+  p = beam.Pipeline(options=PipelineOptions())
+  kvs = p | beam.core.Create('CreateKVs', KVs)
+  kvs | WriteToKVSink('WriteToSimpleKV',
+                      'http://url_to_simple_kv/', final_table_name)
+  # [END model_custom_sink_use_ptransform]
+
+  p.run()
+
 
 def model_textio(renames):
   """Using a Read and Write transform to read/write text files.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index eaf1b3a..7888263 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -393,6 +393,7 @@ class SnippetsTest(unittest.TestCase):
     tempdir_name = tempfile.mkdtemp()
 
     class SimpleKV(object):
+
       def __init__(self, tmp_dir):
         self._dummy_token = 'dummy_token'
         self._tmp_dir = tmp_dir
@@ -426,19 +427,32 @@ class SnippetsTest(unittest.TestCase):
     snippets.model_custom_sink(
         SimpleKV(tempdir_name),
         [('key' + str(i), 'value' + str(i)) for i in range(100)],
-        'final_table')
+        'final_table_no_ptransform', 'final_table_with_ptransform')
+
+    expected_output = [
+        'key' + str(i) + ':' + 'value' + str(i) for i in range(100)]
 
-    glob_pattern = tempdir_name + os.sep + 'final_table*'
+    glob_pattern = tempdir_name + os.sep + 'final_table_no_ptransform*'
     output_files = glob.glob(glob_pattern)
-    assert len(output_files) > 0
+    assert output_files
+
+    received_output = []
+    for file_name in output_files:
+      with open(file_name) as f:
+        for line in f:
+          received_output.append(line.rstrip(os.linesep))
+
+    self.assertItemsEqual(expected_output, received_output)
+
+    glob_pattern = tempdir_name + os.sep + 'final_table_with_ptransform*'
+    output_files = glob.glob(glob_pattern)
+    assert output_files
 
     received_output = []
     for file_name in output_files:
       with open(file_name) as f:
         for line in f:
           received_output.append(line.rstrip(os.linesep))
-    expected_output = [
-        'key' + str(i) + ':' + 'value' + str(i) for i in range(100)]
 
     self.assertItemsEqual(expected_output, received_output)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 25412af..4d1b245 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -26,44 +26,71 @@ from avro import io as avro_io
 from avro import schema
 
 from apache_beam.io import filebasedsource
+from apache_beam.io.iobase import Read
+from apache_beam.transforms import PTransform
+
+
+class ReadFromAvro(PTransform):
+  """A ``PTransform`` for reading avro files."""
+
+  def __init__(self, label=None, file_pattern=None, min_bundle_size=0,
+               **kwargs):
+    """Initializes ``ReadFromAvro``.
+
+    Uses source '_AvroSource' to read a set of Avro files defined by a given
+    file pattern.
+    If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
+    files, a ``PCollection`` for the records in these Avro files can be created
+    in the following manner.
+      p = df.Pipeline(argv=pipeline_args)
+      records = p | df.io.ReadFromAvro('Read', '/mypath/myavrofiles*')
+
+    Each record of this ``PCollection`` will contain a single record read from a
+    source. Records that are of simple types will be mapped into corresponding
+    Python types. Records that are of Avro type 'RECORD' will be mapped to
+    Python dictionaries that comply with the schema contained in the Avro file
+    that contains those records. In this case, keys of each dictionary
+    will contain the corresponding field names and will be of type ``string``
+    while the values of the dictionary will be of the type defined in the
+    corresponding Avro schema.
+    For example, if schema of the Avro file is the following.
+      {"namespace": "example.avro","type": "record","name": "User","fields":
+      [{"name": "name", "type": "string"},
+       {"name": "favorite_number",  "type": ["int", "null"]},
+       {"name": "favorite_color", "type": ["string", "null"]}]}
+    Then records generated by ``AvroSource`` will be dictionaries of the
+    following form.
+      {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
 
+    Args:
+      label: label of the PTransform.
+      file_pattern: the set of files to be read.
+      min_bundle_size: the minimum size in bytes, to be considered when
+                       splitting the input into bundles.
+      **kwargs: Additional keyword arguments to be passed to the base class.
+    """
+    super(ReadFromAvro, self).__init__(label, **kwargs)
+
+    self._file_pattern = file_pattern
+    self._min_bundle_size = min_bundle_size
 
-class AvroSource(filebasedsource.FileBasedSource):
+  def apply(self, pcoll):
+    return pcoll.pipeline | Read(_AvroSource(
+        file_pattern=self._file_pattern, min_bundle_size=self._min_bundle_size))
+
+
+class _AvroSource(filebasedsource.FileBasedSource):
   """A source for reading Avro files.
 
-  ``AvroSource`` is implemented using the file-based source framework available
+  ``_AvroSource`` is implemented using the file-based source framework available
   in module 'filebasedsource'. Hence please refer to module 'filebasedsource'
   to fully understand how this source implements operations common to all
   file-based sources such as file-pattern expansion and splitting into bundles
   for parallel processing.
-
-  If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
-  files, a ``PCollection`` for the records in these Avro files can be created in
-  the following manner.
-
-    p = df.Pipeline(argv=pipeline_args)
-    records = p | df.io.Read('Read', avroio.AvroSource('/mypath/myavrofiles*'))
-
-  Each record of this ``PCollection`` will contain a Python dictionary that
-  complies with the schema contained in the Avro file that contains that
-  particular record.
-  Keys of each dictionary will contain the corresponding field names and will
-  be of type ``string``. Values of the dictionary will be of the type defined in
-  the corresponding Avro schema.
-
-  For example, if schema of the Avro file is following.
-  {"namespace": "example.avro","type": "record","name": "User","fields":
-  [{"name": "name", "type": "string"},
-   {"name": "favorite_number",  "type": ["int", "null"]},
-   {"name": "favorite_color", "type": ["string", "null"]}]}
-
-  Then records generated by ``AvroSource`` will be dictionaries of the following
-  form.
-  {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
   """
 
   def __init__(self, file_pattern, min_bundle_size=0):
-    super(AvroSource, self).__init__(file_pattern, min_bundle_size)
+    super(_AvroSource, self).__init__(file_pattern, min_bundle_size)
     self._avro_schema = None
     self._codec = None
     self._sync_marker = None
@@ -163,9 +190,9 @@ class AvroSource(filebasedsource.FileBasedSource):
     block_size = decoder.read_long()
 
     block_bytes = decoder.read(block_size)
-    return AvroSource.AvroBlock(block_bytes, num_records,
-                                self._avro_schema,
-                                self._codec, f.tell()) if block_bytes else None
+    return _AvroSource.AvroBlock(block_bytes, num_records,
+                                 self._avro_schema,
+                                 self._codec, f.tell()) if block_bytes else None
 
 
 class AvroUtils(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/36ea1071/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 060e211..51bc375 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -19,9 +19,13 @@ import logging
 import os
 import tempfile
 import unittest
-from apache_beam.io import avroio
+
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
+
+# Importing following private class for testing purposes.
+from apache_beam.io.avroio import _AvroSource as AvroSource
+
 import avro.datafile
 from avro.datafile import DataFileWriter
 from avro.io import DatumWriter
@@ -81,7 +85,7 @@ class TestAvro(unittest.TestCase):
 
   def _run_avro_test(
       self, pattern, desired_bundle_size, perform_splitting, expected_result):
-    source = avroio.AvroSource(pattern)
+    source = AvroSource(pattern)
 
     read_records = []
     if perform_splitting:
@@ -148,7 +152,7 @@ class TestAvro(unittest.TestCase):
     try:
       avro.datafile.SYNC_INTERVAL = 5
       file_name = self._write_data(count=20)
-      source = avroio.AvroSource(file_name)
+      source = AvroSource(file_name)
       splits = [split for split in source.split(
           desired_bundle_size=float('inf'))]
       assert len(splits) == 1


[2/2] incubator-beam git commit: Closes #672

Posted by dh...@apache.org.
Closes #672


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/71214b37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/71214b37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/71214b37

Branch: refs/heads/python-sdk
Commit: 71214b37ac681f01580c22301d4b3ed6191b6e23
Parents: ce29ac7 36ea107
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jul 19 21:30:38 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jul 19 21:30:38 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 125 ++++++++++++++++---
 .../examples/snippets/snippets_test.py          |  24 +++-
 sdks/python/apache_beam/io/avroio.py            |  87 ++++++++-----
 sdks/python/apache_beam/io/avroio_test.py       |  10 +-
 4 files changed, 191 insertions(+), 55 deletions(-)
----------------------------------------------------------------------