You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/15 01:51:34 UTC
[beam] Diff for: [GitHub] aaltay merged pull request #7445: [BEAM-5319]
Python 3 port runners module
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 75805d6826bc..e8816fe29773 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -202,7 +202,8 @@ class SafeFastPrimitivesCoder(coders.Coder):
# pylint: disable=deprecated-urllib-function
def encode(self, value):
- return quote(coders.coders.FastPrimitivesCoder().encode(value))
+ return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
+ 'utf-8')
def decode(self, value):
return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index ff82f3b1b237..641643f7fa4b 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -61,10 +61,10 @@ def mock_write_cache(self, pcoll_list, prefix, cache_label):
time.sleep(0.1)
cache_file = cache_label + '-1-of-2'
- with open(self.cache_manager._path(prefix, cache_file), 'w') as f:
+ with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
for line in pcoll_list:
f.write(cache.SafeFastPrimitivesCoder().encode(line))
- f.write('\n')
+ f.write(b'\n')
def test_exists(self):
"""Test that CacheManager can correctly tell if the cache exists or not."""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 767e06e527fb..9958d218570b 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -24,8 +24,6 @@
from __future__ import division
from __future__ import print_function
-import os
-import sys
import unittest
import apache_beam as beam
@@ -44,9 +42,6 @@ def printer(elem):
class InteractiveRunnerTest(unittest.TestCase):
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_basic(self):
p = beam.Pipeline(
runner=interactive_runner.InteractiveRunner(
@@ -62,9 +57,6 @@ def test_basic(self):
_ = pc0 | 'Print3' >> beam.Map(print_with_message('Run3'))
p.run().wait_until_finish()
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_wordcount(self):
class WordExtractingDoFn(beam.DoFn):
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index 53a4a33c9622..92b5af1097a4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -24,8 +24,6 @@
from __future__ import division
from __future__ import print_function
-import os
-import sys
import unittest
import apache_beam as beam
@@ -88,9 +86,6 @@ def assertTransformEqual(self, pipeline_proto1, transform_id1,
self.assertSetEqual(set(transform_proto1.outputs),
set(transform_proto2.outputs))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_basic(self):
p = beam.Pipeline(runner=self.runner)
@@ -138,9 +133,6 @@ def test_basic(self):
# No need to actually execute the second run.
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_word_count(self):
p = beam.Pipeline(runner=self.runner)
@@ -223,9 +215,6 @@ def test_write_cache_expansion(self):
self.assertPipelineEqual(analyzer.pipeline_proto_to_execute(),
expected_pipeline_proto)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_read_cache_expansion(self):
p = beam.Pipeline(runner=self.runner)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index dc248d585706..f30cd9781ac5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -147,9 +147,6 @@ def even_odd(elem):
assert_that(unnamed.even, equal_to([2]), label='unnamed.even')
assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd')
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_pardo_side_inputs(self):
def cross_product(elem, sides):
for side in sides:
@@ -161,9 +158,6 @@ def cross_product(elem, sides):
equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'),
('a', 'y'), ('b', 'y'), ('c', 'y')]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_pardo_windowed_side_inputs(self):
with self.create_pipeline() as p:
# Now with some windowing.
@@ -191,9 +185,6 @@ def test_pardo_windowed_side_inputs(self):
(9, list(range(7, 10)))]),
label='windowed')
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_flattened_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
@@ -204,9 +195,6 @@ def test_flattened_side_input(self):
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': 1, 'b': 2})]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_gbk_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
@@ -215,9 +203,6 @@ def test_gbk_side_input(self):
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': [1]})]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_multimap_side_input(self):
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create(['a', 'b'])
@@ -229,9 +214,6 @@ def test_multimap_side_input(self):
beam.pvalue.AsMultiMap(side)),
equal_to([('a', [1, 3]), ('b', [2])]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_pardo_unfusable_side_inputs(self):
def cross_product(elem, sides):
for side in sides:
@@ -414,9 +396,6 @@ def test_windowing(self):
| beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_large_elements(self):
with self.create_pipeline() as p:
big = (p
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index ba87d14659cb..511b9b2099c2 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -20,9 +20,7 @@
import logging
import math
-import os
import random
-import sys
import unittest
from builtins import object
from builtins import range
@@ -169,15 +167,12 @@ def test_update_multiple(self):
total_size += coder.estimate_size(value)
self.verify_counters(opcounts, 3, (float(total_size) / 3))
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3.')
def test_should_sample(self):
# Order of magnitude more buckets than highest constant in code under test.
buckets = [0] * 300
# The seed is arbitrary and exists just to ensure this test is robust.
# If you don't like this seed, try your own; the test should still pass.
- random.seed(1717)
+ random.seed(1720)
# Do enough runs that the expected hits even in the last buckets
# is big enough to expect some statistical smoothing.
total_runs = 10 * len(buckets)
With regards,
Apache Git Services