You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/02 19:45:48 UTC
[beam] branch master updated: [BEAM-5315] Python 3 port io.source*
and io.concat_source* modules (#7383)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5cdf3a7 [BEAM-5315] Python 3 port io.source* and io.concat_source* modules (#7383)
5cdf3a7 is described below
commit 5cdf3a79d603decd28ef4006370f6310ce1cae98
Author: Robbe Sneyders <ro...@gmail.com>
AuthorDate: Wed Jan 2 20:45:40 2019 +0100
[BEAM-5315] Python 3 port io.source* and io.concat_source* modules (#7383)
* Python 3 port io.range_trackers
* Add io.restriction_trackers_test to Python 3 test suite
* Python 3 port io.source_test_utils module
* Add io.concat_source_test to Python 3 test suite
* Python 3 port io.sources_test module
---
.../apache_beam/io/source_test_utils_test.py | 29 ++--------------------
sdks/python/apache_beam/io/sources_test.py | 18 ++++----------
sdks/python/tox.ini | 2 +-
3 files changed, 8 insertions(+), 41 deletions(-)
diff --git a/sdks/python/apache_beam/io/source_test_utils_test.py b/sdks/python/apache_beam/io/source_test_utils_test.py
index 1dd09c0..6977d09 100644
--- a/sdks/python/apache_beam/io/source_test_utils_test.py
+++ b/sdks/python/apache_beam/io/source_test_utils_test.py
@@ -18,7 +18,6 @@
from __future__ import absolute_import
import logging
-import os
import sys
import tempfile
import unittest
@@ -40,12 +39,12 @@ class SourceTestUtilsTest(unittest.TestCase):
assert isinstance(lines, list)
with tempfile.NamedTemporaryFile(delete=False) as f:
for line in lines:
- f.write(line + '\n')
+ f.write(line + b'\n')
return f.name
def _create_data(self, num_lines):
- return ['line ' + str(i) for i in range(num_lines)]
+ return [b'line ' + str(i).encode('latin1') for i in range(num_lines)]
def _create_source(self, data):
source = LineSource(self._create_file_with_data(data))
@@ -55,20 +54,12 @@ class SourceTestUtilsTest(unittest.TestCase):
for bundle in source.split(float('inf')):
return bundle.source
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_read_from_source(self):
data = self._create_data(100)
source = self._create_source(data)
self.assertCountEqual(
data, source_test_utils.read_from_source(source, None, None))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_source_equals_reference_source(self):
data = self._create_data(100)
reference_source = self._create_source(data)
@@ -82,10 +73,6 @@ class SourceTestUtilsTest(unittest.TestCase):
source_test_utils.assert_sources_equal_reference_source(
(reference_source, None, None), sources_info)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_split_at_fraction_successful(self):
data = self._create_data(100)
source = self._create_source(data)
@@ -110,10 +97,6 @@ class SourceTestUtilsTest(unittest.TestCase):
self.assertTrue(result1[0] < result3[0])
self.assertTrue(result1[1] > result3[1])
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_split_at_fraction_fails(self):
data = self._create_data(100)
source = self._create_source(data)
@@ -127,10 +110,6 @@ class SourceTestUtilsTest(unittest.TestCase):
source_test_utils.assert_split_at_fraction_behavior(
source, 10, 0.5, source_test_utils.ExpectedSplitOutcome.MUST_FAIL)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_split_at_fraction_binary(self):
data = self._create_data(100)
source = self._create_source(data)
@@ -143,10 +122,6 @@ class SourceTestUtilsTest(unittest.TestCase):
self.assertTrue(stats.successful_fractions)
self.assertTrue(stats.non_trivial_fractions)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_split_at_fraction_exhaustive(self):
data = self._create_data(10)
source = self._create_source(data)
diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py
index 1e7de57..318508a 100644
--- a/sdks/python/apache_beam/io/sources_test.py
+++ b/sdks/python/apache_beam/io/sources_test.py
@@ -53,7 +53,7 @@ class LineSource(iobase.BoundedSource):
for line in f:
if not range_tracker.try_claim(current):
return
- yield line.rstrip('\n')
+ yield line.rstrip(b'\n')
current += len(line)
def split(self, desired_bundle_size, start_position=None, stop_position=None):
@@ -95,28 +95,20 @@ class SourcesTest(unittest.TestCase):
f.write(contents)
return f.name
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_read_from_source(self):
- file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
+ file_name = self._create_temp_file(b'aaaa\nbbbb\ncccc\ndddd')
source = LineSource(file_name)
range_tracker = source.get_range_tracker(None, None)
result = [line for line in source.read(range_tracker)]
- self.assertCountEqual(['aaaa', 'bbbb', 'cccc', 'dddd'], result)
+ self.assertCountEqual([b'aaaa', b'bbbb', b'cccc', b'dddd'], result)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-5627')
def test_run_direct(self):
- file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd')
+ file_name = self._create_temp_file(b'aaaa\nbbbb\ncccc\ndddd')
pipeline = TestPipeline()
pcoll = pipeline | beam.io.Read(LineSource(file_name))
- assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd']))
+ assert_that(pcoll, equal_to([b'aaaa', b'bbbb', b'cccc', b'dddd']))
pipeline.run()
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 3fe1d78..a421a36 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -58,7 +58,7 @@ setenv =
BEAM_EXPERIMENTAL_PY3=1
RUN_SKIPPED_PY3_TESTS=0
modules =
- apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
+ apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
commands =
python --version
pip --version