You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/11/21 00:32:22 UTC
[beam] branch master updated: [BEAM-5315] Partially port IO: avro
schema parsing and codecs (#6925)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 51492f5 [BEAM-5315] Partially port IO: avro schema parsing and codecs (#6925)
51492f5 is described below
commit 51492f5e60d5f8a5317ac3e83cb9cef69c006d0b
Author: Simon Plovyt <40...@users.noreply.github.com>
AuthorDate: Wed Nov 21 01:32:16 2018 +0100
[BEAM-5315] Partially port IO: avro schema parsing and codecs (#6925)
* [BEAM-5315] Partially port IO: avro schema parsing and codecs
* Update the explicit definition of the writer and reader avro schemas.
---
sdks/python/apache_beam/io/avroio.py | 12 +++++++-----
sdks/python/apache_beam/io/filesystemio_test.py | 2 +-
sdks/python/apache_beam/io/tfrecordio_test.py | 3 +++
3 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 79c2505..b648ad2 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -330,14 +330,14 @@ class _AvroBlock(object):
@staticmethod
def _decompress_bytes(data, codec):
- if codec == 'null':
+ if codec == b'null':
return data
- elif codec == 'deflate':
+ elif codec == b'deflate':
# zlib.MAX_WBITS is the window size. '-' sign indicates that this is
# raw data (without headers). See zlib and Avro documentations for more
# details.
return zlib.decompress(data, -zlib.MAX_WBITS)
- elif codec == 'snappy':
+ elif codec == b'snappy':
# Snappy is an optional avro codec.
# See Snappy and Avro documentation for more details.
try:
@@ -360,8 +360,10 @@ class _AvroBlock(object):
def records(self):
decoder = avroio.BinaryDecoder(
io.BytesIO(self._decompressed_block_bytes))
- reader = avroio.DatumReader(
- writers_schema=self._schema, readers_schema=self._schema)
+
+ writer_schema = self._schema
+ reader_schema = self._schema
+ reader = avroio.DatumReader(writer_schema, reader_schema)
current_record = 0
while current_record < self._num_records:
diff --git a/sdks/python/apache_beam/io/filesystemio_test.py b/sdks/python/apache_beam/io/filesystemio_test.py
index f11e1c4..0a03881 100644
--- a/sdks/python/apache_beam/io/filesystemio_test.py
+++ b/sdks/python/apache_beam/io/filesystemio_test.py
@@ -177,7 +177,7 @@ class TestPipeStream(unittest.TestCase):
data_list.append(data)
bytes_read += len(data)
self.assertEqual(stream.tell(), bytes_read)
- self.assertEqual(''.join(data_list), expected)
+ self.assertEqual(b''.join(data_list), expected)
def test_pipe_stream(self):
block_sizes = list(4**i for i in range(0, 12))
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 6421fa5..07480b2 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -405,6 +405,9 @@ class TestReadAllFromTFRecord(unittest.TestCase):
assert_that(result, equal_to(['foo', 'bar']))
+@unittest.skipIf(sys.version_info[0] == 3,
+ 'This test still needs to be fixed on Python 3'
+ 'TODO: BEAM-5623 - several IO tests hang indefinitely')
class TestEnd2EndWriteAndRead(unittest.TestCase):
def create_inputs(self):