You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/15 01:51:39 UTC
[beam] branch master updated: [BEAM-5319] Python 3 port runners
module (#7445)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2bcf9fd [BEAM-5319] Python 3 port runners module (#7445)
2bcf9fd is described below
commit 2bcf9fd40390dd0192c5456abedfbb290a792866
Author: Robbe Sneyders <ro...@gmail.com>
AuthorDate: Tue Jan 15 02:51:31 2019 +0100
[BEAM-5319] Python 3 port runners module (#7445)
* Python 3 port runners module
* Change random seed in opcounters test to one that works on both Python 2 and 3
---
.../runners/interactive/cache_manager.py | 3 ++-
.../runners/interactive/cache_manager_test.py | 4 ++--
.../runners/interactive/interactive_runner_test.py | 8 --------
.../runners/interactive/pipeline_analyzer_test.py | 11 -----------
.../runners/portability/fn_api_runner_test.py | 21 ---------------------
.../apache_beam/runners/worker/opcounters_test.py | 7 +------
6 files changed, 5 insertions(+), 49 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 75805d6..e8816fe 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -202,7 +202,8 @@ class SafeFastPrimitivesCoder(coders.Coder):
# pylint: disable=deprecated-urllib-function
def encode(self, value):
- return quote(coders.coders.FastPrimitivesCoder().encode(value))
+ return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
+ 'utf-8')
def decode(self, value):
return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index ff82f3b..641643f 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -61,10 +61,10 @@ class FileBasedCacheManagerTest(unittest.TestCase):
time.sleep(0.1)
cache_file = cache_label + '-1-of-2'
- with open(self.cache_manager._path(prefix, cache_file), 'w') as f:
+ with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
for line in pcoll_list:
f.write(cache.SafeFastPrimitivesCoder().encode(line))
- f.write('\n')
+ f.write(b'\n')
def test_exists(self):
"""Test that CacheManager can correctly tell if the cache exists or not."""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 767e06e..9958d21 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -24,8 +24,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-import os
-import sys
import unittest
import apache_beam as beam
@@ -44,9 +42,6 @@ def print_with_message(msg):
class InteractiveRunnerTest(unittest.TestCase):
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_basic(self):
p = beam.Pipeline(
runner=interactive_runner.InteractiveRunner(
@@ -62,9 +57,6 @@ class InteractiveRunnerTest(unittest.TestCase):
_ = pc0 | 'Print3' >> beam.Map(print_with_message('Run3'))
p.run().wait_until_finish()
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_wordcount(self):
class WordExtractingDoFn(beam.DoFn):
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index 53a4a33..92b5af1 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -24,8 +24,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-import os
-import sys
import unittest
import apache_beam as beam
@@ -88,9 +86,6 @@ class PipelineAnalyzerTest(unittest.TestCase):
self.assertSetEqual(set(transform_proto1.outputs),
set(transform_proto2.outputs))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_basic(self):
p = beam.Pipeline(runner=self.runner)
@@ -138,9 +133,6 @@ class PipelineAnalyzerTest(unittest.TestCase):
# No need to actually execute the second run.
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_word_count(self):
p = beam.Pipeline(runner=self.runner)
@@ -223,9 +215,6 @@ class PipelineAnalyzerTest(unittest.TestCase):
self.assertPipelineEqual(analyzer.pipeline_proto_to_execute(),
expected_pipeline_proto)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_read_cache_expansion(self):
p = beam.Pipeline(runner=self.runner)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index dc248d5..f30cd97 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -147,9 +147,6 @@ class FnApiRunnerTest(unittest.TestCase):
assert_that(unnamed.even, equal_to([2]), label='unnamed.even')
assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd')
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_pardo_side_inputs(self):
def cross_product(elem, sides):
for side in sides:
@@ -161,9 +158,6 @@ class FnApiRunnerTest(unittest.TestCase):
equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'),
('a', 'y'), ('b', 'y'), ('c', 'y')]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_pardo_windowed_side_inputs(self):
with self.create_pipeline() as p:
# Now with some windowing.
@@ -191,9 +185,6 @@ class FnApiRunnerTest(unittest.TestCase):
(9, list(range(7, 10)))]),
label='windowed')
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_flattened_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
@@ -204,9 +195,6 @@ class FnApiRunnerTest(unittest.TestCase):
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': 1, 'b': 2})]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_gbk_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
@@ -215,9 +203,6 @@ class FnApiRunnerTest(unittest.TestCase):
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': [1]})]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_multimap_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create(['a', 'b'])
@@ -229,9 +214,6 @@ class FnApiRunnerTest(unittest.TestCase):
beam.pvalue.AsMultiMap(side)),
equal_to([('a', [1, 3]), ('b', [2])]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_pardo_unfusable_side_inputs(self):
def cross_product(elem, sides):
for side in sides:
@@ -414,9 +396,6 @@ class FnApiRunnerTest(unittest.TestCase):
| beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_large_elements(self):
with self.create_pipeline() as p:
big = (p
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index ba87d14..511b9b2 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -20,9 +20,7 @@ from __future__ import division
import logging
import math
-import os
import random
-import sys
import unittest
from builtins import object
from builtins import range
@@ -169,15 +167,12 @@ class OperationCountersTest(unittest.TestCase):
total_size += coder.estimate_size(value)
self.verify_counters(opcounts, 3, (float(total_size) / 3))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_should_sample(self):
# Order of magnitude more buckets than highest constant in code under test.
buckets = [0] * 300
# The seed is arbitrary and exists just to ensure this test is robust.
# If you don't like this seed, try your own; the test should still pass.
- random.seed(1717)
+ random.seed(1720)
# Do enough runs that the expected hits even in the last buckets
# is big enough to expect some statistical smoothing.
total_runs = 10 * len(buckets)