You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/02 22:19:08 UTC
[beam] branch master updated: [BEAM-5315] Python 3 port
io.filebased_* modules (#7386)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4b039e4 [BEAM-5315] Python 3 port io.filebased_* modules (#7386)
4b039e4 is described below
commit 4b039e4bb36f2a59a0ac8f0ff0548ce6f772d012
Author: Robbe Sneyders <ro...@gmail.com>
AuthorDate: Wed Jan 2 23:18:59 2019 +0100
[BEAM-5315] Python 3 port io.filebased_* modules (#7386)
* Add apache_beam.io.filebasedsink_test to Python 3 test suite
* Python 3 port filebasedsource
---
sdks/python/apache_beam/io/filebasedsource_test.py | 20 ++++++++------------
sdks/python/tox.ini | 2 +-
2 files changed, 9 insertions(+), 13 deletions(-)
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 4056827..7964a61 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -52,10 +52,6 @@ from apache_beam.transforms.display_test import DisplayDataItemMatcher
class LineSource(FileBasedSource):
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def read_records(self, file_name, range_tracker):
f = self.open_file(file_name)
try:
@@ -445,7 +441,7 @@ class TestFileBasedSource(unittest.TestCase):
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template).name
with bz2.BZ2File(filename, 'wb') as f:
- f.write('\n'.join(lines))
+ f.write(b'\n'.join(lines))
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
@@ -460,7 +456,7 @@ class TestFileBasedSource(unittest.TestCase):
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template).name
with gzip.GzipFile(filename, 'wb') as f:
- f.write('\n'.join(lines))
+ f.write(b'\n'.join(lines))
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
@@ -478,7 +474,7 @@ class TestFileBasedSource(unittest.TestCase):
for c in chunks:
compressobj = bz2.BZ2Compressor()
compressed_chunks.append(
- compressobj.compress('\n'.join(c)) + compressobj.flush())
+ compressobj.compress(b'\n'.join(c)) + compressobj.flush())
file_pattern = write_prepared_pattern(compressed_chunks)
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
@@ -496,7 +492,7 @@ class TestFileBasedSource(unittest.TestCase):
for c in chunks:
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
- f.write('\n'.join(c))
+ f.write(b'\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(compressed_chunks)
pipeline = TestPipeline()
@@ -512,7 +508,7 @@ class TestFileBasedSource(unittest.TestCase):
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template, suffix='.bz2').name
with bz2.BZ2File(filename, 'wb') as f:
- f.write('\n'.join(lines))
+ f.write(b'\n'.join(lines))
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
@@ -526,7 +522,7 @@ class TestFileBasedSource(unittest.TestCase):
filename = tempfile.NamedTemporaryFile(
delete=False, prefix=tempfile.template, suffix='.gz').name
with gzip.GzipFile(filename, 'wb') as f:
- f.write('\n'.join(lines))
+ f.write(b'\n'.join(lines))
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> beam.io.Read(LineSource(
@@ -543,7 +539,7 @@ class TestFileBasedSource(unittest.TestCase):
for c in chunks:
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
- f.write('\n'.join(c))
+ f.write(b'\n'.join(c))
compressed_chunks.append(out.getvalue())
file_pattern = write_prepared_pattern(
compressed_chunks, suffixes=['.gz']*len(chunks))
@@ -563,7 +559,7 @@ class TestFileBasedSource(unittest.TestCase):
if i%2 == 0:
out = io.BytesIO()
with gzip.GzipFile(fileobj=out, mode="wb") as f:
- f.write('\n'.join(c))
+ f.write(b'\n'.join(c))
chunks_to_write.append(out.getvalue())
else:
chunks_to_write.append(b'\n'.join(c))
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index a421a36..9c0dd29 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -58,7 +58,7 @@ setenv =
BEAM_EXPERIMENTAL_PY3=1
RUN_SKIPPED_PY3_TESTS=0
modules =
- apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
+ apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
commands =
python --version
pip --version