You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/11/21 00:32:22 UTC

[beam] branch master updated: [BEAM-5315] Partially port IO: avro schema parsing and codecs (#6925)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 51492f5  [BEAM-5315] Partially port IO: avro schema parsing and codecs (#6925)
51492f5 is described below

commit 51492f5e60d5f8a5317ac3e83cb9cef69c006d0b
Author: Simon Plovyt <40...@users.noreply.github.com>
AuthorDate: Wed Nov 21 01:32:16 2018 +0100

    [BEAM-5315] Partially port IO: avro schema parsing and codecs (#6925)
    
    * [BEAM-5315] Partially port IO: avro schema parsing and codecs
    * Update the explicit definition of the writer and reader avro schemas.
---
 sdks/python/apache_beam/io/avroio.py            | 12 +++++++-----
 sdks/python/apache_beam/io/filesystemio_test.py |  2 +-
 sdks/python/apache_beam/io/tfrecordio_test.py   |  3 +++
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 79c2505..b648ad2 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -330,14 +330,14 @@ class _AvroBlock(object):
 
   @staticmethod
   def _decompress_bytes(data, codec):
-    if codec == 'null':
+    if codec == b'null':
       return data
-    elif codec == 'deflate':
+    elif codec == b'deflate':
       # zlib.MAX_WBITS is the window size. '-' sign indicates that this is
       # raw data (without headers). See zlib and Avro documentations for more
       # details.
       return zlib.decompress(data, -zlib.MAX_WBITS)
-    elif codec == 'snappy':
+    elif codec == b'snappy':
       # Snappy is an optional avro codec.
       # See Snappy and Avro documentation for more details.
       try:
@@ -360,8 +360,10 @@ class _AvroBlock(object):
   def records(self):
     decoder = avroio.BinaryDecoder(
         io.BytesIO(self._decompressed_block_bytes))
-    reader = avroio.DatumReader(
-        writers_schema=self._schema, readers_schema=self._schema)
+
+    writer_schema = self._schema
+    reader_schema = self._schema
+    reader = avroio.DatumReader(writer_schema, reader_schema)
 
     current_record = 0
     while current_record < self._num_records:
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py
index f11e1c4..0a03881 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -177,7 +177,7 @@ class TestPipeStream(unittest.TestCase):
       data_list.append(data)
       bytes_read += len(data)
       self.assertEqual(stream.tell(), bytes_read)
-    self.assertEqual(''.join(data_list), expected)
+    self.assertEqual(b''.join(data_list), expected)
 
   def test_pipe_stream(self):
     block_sizes = list(4**i for i in range(0, 12))
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 6421fa5..07480b2 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -405,6 +405,9 @@ class TestReadAllFromTFRecord(unittest.TestCase):
         assert_that(result, equal_to(['foo', 'bar']))
 
 
+@unittest.skipIf(sys.version_info[0] == 3,
+                 'This test still needs to be fixed on Python 3'
+                 'TODO: BEAM-5623 - several IO tests hang indefinitely')
 class TestEnd2EndWriteAndRead(unittest.TestCase):
 
   def create_inputs(self):