You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/26 19:17:51 UTC

[1/4] incubator-beam git commit: Implement avro sink.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 7d988e3bb -> 7e744e445


Implement avro sink.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1090ca39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1090ca39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1090ca39

Branch: refs/heads/python-sdk
Commit: 1090ca3911faef84bbd61c6b41668a695d2fe8dc
Parents: 8bc965b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Sep 24 02:25:05 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Sep 26 12:17:35 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py      | 100 ++++++++++++++++++++++++-
 sdks/python/apache_beam/io/avroio_test.py |  41 ++++++----
 2 files changed, 127 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1090ca39/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 82b30be..fdf8dae 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -20,15 +20,18 @@ import os
 import StringIO
 import zlib
 
+import avro
 from avro import datafile
 from avro import io as avroio
 from avro import schema
 
+import apache_beam as beam
 from apache_beam.io import filebasedsource
+from apache_beam.io import fileio
 from apache_beam.io.iobase import Read
 from apache_beam.transforms import PTransform
 
-__all__ = ['ReadFromAvro']
+__all__ = ['ReadFromAvro', 'WriteToAvro']
 
 
 class ReadFromAvro(PTransform):
@@ -242,3 +245,98 @@ class _AvroSource(filebasedsource.FileBasedSource):
                                                 sync_marker)
         for record in block.records():
           yield record
+
+
+_avro_codecs = {
+    fileio.CompressionTypes.UNCOMPRESSED: 'null',
+    fileio.CompressionTypes.ZLIB: 'deflate',
+    # fileio.CompressionTypes.SNAPPY: 'snappy',
+}
+
+
+class WriteToAvro(beam.transforms.PTransform):
+  """A ``PTransform`` for writing avro files."""
+
+  def __init__(self,
+               file_path_prefix,
+               schema,
+               file_name_suffix='',
+               num_shards=0,
+               shard_name_template=None,
+               mime_type='application/x-avro',
+               compression_type=fileio.CompressionTypes.ZLIB):
+    """Initialize a WriteToAvro transform.
+
+    Args:
+      file_path_prefix: The file path to write to. The files written will begin
+        with this prefix, followed by a shard identifier (see num_shards), and
+        end in a common extension, if given by file_name_suffix. In most cases,
+        only this argument is specified and num_shards, shard_name_template, and
+        file_name_suffix use default values.
+      schema: The schema to use, as returned by avro.schema.parse
+      file_name_suffix: Suffix for the files written.
+      append_trailing_newlines: indicate whether this sink should write an
+        additional newline char after writing each element.
+      num_shards: The number of files (shards) used for output. If not set, the
+        service will decide on the optimal number of shards.
+        Constraining the number of shards is likely to reduce
+        the performance of a pipeline.  Setting this value is not recommended
+        unless you require a specific number of output files.
+      shard_name_template: A template string containing placeholders for
+        the shard number and shard count. Currently only '' and
+        '-SSSSS-of-NNNNN' are patterns accepted by the service.
+        When constructing a filename for a particular shard number, the
+        upper-case letters 'S' and 'N' are replaced with the 0-padded shard
+        number and shard count respectively.  This argument can be '' in which
+        case it behaves as if num_shards was set to 1 and only one file will be
+        generated. The default pattern used is '-SSSSS-of-NNNNN'.
+      mime_type: The MIME type to use for the produced files, if the filesystem
+        supports specifying MIME types.
+      compression_type: Used to handle compressed output files. Defaults to
+        CompressionTypes.ZLIB
+
+    Returns:
+      A WriteToAvro transform usable for writing.
+    """
+    if compression_type not in _avro_codecs:
+      raise ValueError(
+          'Compression type %s not supported by avro.' % compression_type)
+    self.args = (file_path_prefix, schema, file_name_suffix, num_shards,
+                 shard_name_template, mime_type, compression_type)
+
+  def apply(self, pcoll):
+    # pylint: disable=expression-not-assigned
+    pcoll | beam.io.iobase.Write(_AvroSink(*self.args))
+
+
+class _AvroSink(fileio.FileSink):
+  """A sink to avro files."""
+
+  def __init__(self,
+               file_path_prefix,
+               schema,
+               file_name_suffix,
+               num_shards,
+               shard_name_template,
+               mime_type,
+               compression_type):
+    super(_AvroSink, self).__init__(
+        file_path_prefix,
+        file_name_suffix=file_name_suffix,
+        num_shards=num_shards,
+        shard_name_template=shard_name_template,
+        coder=None,
+        mime_type=mime_type,
+        # Compression happens at the block level, not the file level.
+        compression_type=fileio.CompressionTypes.UNCOMPRESSED)
+    self.schema = schema
+    self.avro_compression_type = compression_type
+
+  def open(self, temp_path):
+    file_handle = super(_AvroSink, self).open(temp_path)
+    return avro.datafile.DataFileWriter(
+        file_handle, avro.io.DatumWriter(), self.schema,
+        _avro_codecs[self.avro_compression_type])
+
+  def write_record(self, writer, value):
+    writer.append(value)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1090ca39/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index e0c211f..dbaf6f3 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import json
 import logging
 import os
 import tempfile
@@ -33,7 +34,7 @@ from apache_beam.io.avroio import _AvroSource as AvroSource
 import avro.datafile
 from avro.datafile import DataFileWriter
 from avro.io import DatumWriter
-import avro.schema as avro_schema
+import avro.schema
 
 
 class TestAvro(unittest.TestCase):
@@ -67,25 +68,27 @@ class TestAvro(unittest.TestCase):
                                          'favorite_number': 6,
                                          'favorite_color': 'Green'}]
 
+  SCHEMA = avro.schema.parse('''
+  {"namespace": "example.avro",
+   "type": "record",
+   "name": "User",
+   "fields": [
+       {"name": "name", "type": "string"},
+       {"name": "favorite_number",  "type": ["int", "null"]},
+       {"name": "favorite_color", "type": ["string", "null"]}
+   ]
+  }
+  ''')
+
   def _write_data(self,
                   directory=None,
                   prefix=tempfile.template,
                   codec='null',
                   count=len(RECORDS)):
-    schema = ('{\"namespace\": \"example.avro\",'
-              '\"type\": \"record\",'
-              '\"name\": \"User\",'
-              '\"fields\": ['
-              '{\"name\": \"name\", \"type\": \"string\"},'
-              '{\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},'
-              '{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}'
-              ']}')
-
-    schema = avro_schema.parse(schema)
 
     with tempfile.NamedTemporaryFile(
         delete=False, dir=directory, prefix=prefix) as f:
-      writer = DataFileWriter(f, DatumWriter(), schema, codec=codec)
+      writer = DataFileWriter(f, DatumWriter(), self.SCHEMA, codec=codec)
       len_records = len(self.RECORDS)
       for i in range(count):
         writer.append(self.RECORDS[i % len_records])
@@ -227,11 +230,23 @@ class TestAvro(unittest.TestCase):
       source_test_utils.readFromSource(source, None, None)
       self.assertEqual(0, exn.exception.message.find('Unexpected sync marker'))
 
-  def test_pipeline(self):
+  def test_source_transform(self):
     path = self._write_data()
     with beam.Pipeline('DirectPipelineRunner') as p:
       assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS))
 
+  def test_sink_transform(self):
+    with tempfile.NamedTemporaryFile() as dst:
+      path = dst.name
+      with beam.Pipeline('DirectPipelineRunner') as p:
+        # pylint: disable=expression-not-assigned
+        p | beam.Create(self.RECORDS) | avroio.WriteToAvro(path, self.SCHEMA)
+      with beam.Pipeline('DirectPipelineRunner') as p:
+        # json used for stable sortability
+        readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
+        assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


[2/4] incubator-beam git commit: Fix and add test for ReadFromAvro transform.

Posted by ro...@apache.org.
Fix and add test for ReadFromAvro transform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bc965b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bc965b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bc965b6

Branch: refs/heads/python-sdk
Commit: 8bc965b61becf5f92bcd3ff4468fa53fde5b6e6b
Parents: f9c565b
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Sep 24 01:26:31 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Sep 26 12:17:35 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py      | 5 ++---
 sdks/python/apache_beam/io/avroio_test.py | 8 ++++++++
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc965b6/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 3415d22..82b30be 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -34,8 +34,7 @@ __all__ = ['ReadFromAvro']
 class ReadFromAvro(PTransform):
   """A ``PTransform`` for reading avro files."""
 
-  def __init__(self, label=None, file_pattern=None, min_bundle_size=0,
-               **kwargs):
+  def __init__(self, file_pattern=None, min_bundle_size=0):
     """Initializes ``ReadFromAvro``.
 
     Uses source '_AvroSource' to read a set of Avro files defined by a given
@@ -70,7 +69,7 @@ class ReadFromAvro(PTransform):
                        splitting the input into bundles.
       **kwargs: Additional keyword arguments to be passed to the base class.
     """
-    super(ReadFromAvro, self).__init__(label, **kwargs)
+    super(ReadFromAvro, self).__init__()
 
     self._file_pattern = file_pattern
     self._min_bundle_size = min_bundle_size

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc965b6/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index c21ed57..e0c211f 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -20,8 +20,12 @@ import os
 import tempfile
 import unittest
 
+import apache_beam as beam
+from apache_beam.io import avroio
 from apache_beam.io import filebasedsource
 from apache_beam.io import source_test_utils
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
 
 # Importing following private class for testing purposes.
 from apache_beam.io.avroio import _AvroSource as AvroSource
@@ -223,6 +227,10 @@ class TestAvro(unittest.TestCase):
       source_test_utils.readFromSource(source, None, None)
       self.assertEqual(0, exn.exception.message.find('Unexpected sync marker'))
 
+  def test_pipeline(self):
+    path = self._write_data()
+    with beam.Pipeline('DirectPipelineRunner') as p:
+      assert_that(p | avroio.ReadFromAvro(path), equal_to(self.RECORDS))
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)


[4/4] incubator-beam git commit: Closes #1000

Posted by ro...@apache.org.
Closes #1000


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e744e44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e744e44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e744e44

Branch: refs/heads/python-sdk
Commit: 7e744e4455cecb08870145ae100d2b8de8bd7134
Parents: 7d988e3 1090ca3
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Sep 26 12:17:36 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Sep 26 12:17:36 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py      | 105 ++++++++++++++++++++++++-
 sdks/python/apache_beam/io/avroio_test.py |  56 ++++++++++---
 sdks/python/apache_beam/io/textio_test.py |   5 ++
 3 files changed, 150 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Cleanup temporary files in textio and avroio tests.

Posted by ro...@apache.org.
Cleanup temporary files in textio and avroio tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9c565b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9c565b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9c565b6

Branch: refs/heads/python-sdk
Commit: f9c565b6d9c2b7b11e6cba8f1f7354fd4e369193
Parents: 7d988e3
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Sat Sep 24 01:16:43 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Sep 26 12:17:35 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio_test.py | 9 +++++++++
 sdks/python/apache_beam/io/textio_test.py | 5 +++++
 2 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9c565b6/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py
index 29c4209..c21ed57 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -34,11 +34,19 @@ import avro.schema as avro_schema
 
 class TestAvro(unittest.TestCase):
 
+  _temp_files = []
+
   def setUp(self):
     # Reducing the size of thread pools. Without this test execution may fail in
     # environments with limited amount of resources.
     filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2
 
+  def tearDown(self):
+    for path in self._temp_files:
+      if os.path.exists(path):
+        os.remove(path)
+    self._temp_files = []
+
   RECORDS = [{'name': 'Thomas',
               'favorite_number': 1,
               'favorite_color': 'blue'}, {'name': 'Henry',
@@ -79,6 +87,7 @@ class TestAvro(unittest.TestCase):
         writer.append(self.RECORDS[i % len_records])
       writer.close()
 
+      self._temp_files.append(f.name)
       return f.name
 
   def _write_pattern(self, num_files):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9c565b6/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 3fa0f9a..fef2b79 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -20,6 +20,7 @@
 import glob
 import gzip
 import logging
+import os
 import tempfile
 import unittest
 import zlib
@@ -330,6 +331,10 @@ class TextSinkTest(unittest.TestCase):
     self.lines = ['Line %d' % d for d in range(100)]
     self.path = tempfile.NamedTemporaryFile().name
 
+  def tearDown(self):
+    if os.path.exists(self.path):
+      os.remove(self.path)
+
   def _write_lines(self, sink, lines):
     f = sink.open(self.path)
     for line in lines: