You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/15 01:51:34 UTC

[beam] Diff for: [GitHub] aaltay merged pull request #7445: [BEAM-5319] Python 3 port runners module

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 75805d6826bc..e8816fe29773 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -202,7 +202,8 @@ class SafeFastPrimitivesCoder(coders.Coder):
   # pylint: disable=deprecated-urllib-function
 
   def encode(self, value):
-    return quote(coders.coders.FastPrimitivesCoder().encode(value))
+    return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
+        'utf-8')
 
   def decode(self, value):
     return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index ff82f3b1b237..641643f7fa4b 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -61,10 +61,10 @@ def mock_write_cache(self, pcoll_list, prefix, cache_label):
     time.sleep(0.1)
 
     cache_file = cache_label + '-1-of-2'
-    with open(self.cache_manager._path(prefix, cache_file), 'w') as f:
+    with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
       for line in pcoll_list:
         f.write(cache.SafeFastPrimitivesCoder().encode(line))
-        f.write('\n')
+        f.write(b'\n')
 
   def test_exists(self):
     """Test that CacheManager can correctly tell if the cache exists or not."""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 767e06e527fb..9958d218570b 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -24,8 +24,6 @@
 from __future__ import division
 from __future__ import print_function
 
-import os
-import sys
 import unittest
 
 import apache_beam as beam
@@ -44,9 +42,6 @@ def printer(elem):
 
 class InteractiveRunnerTest(unittest.TestCase):
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_basic(self):
     p = beam.Pipeline(
         runner=interactive_runner.InteractiveRunner(
@@ -62,9 +57,6 @@ def test_basic(self):
     _ = pc0 | 'Print3' >> beam.Map(print_with_message('Run3'))
     p.run().wait_until_finish()
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_wordcount(self):
 
     class WordExtractingDoFn(beam.DoFn):
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index 53a4a33c9622..92b5af1097a4 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -24,8 +24,6 @@
 from __future__ import division
 from __future__ import print_function
 
-import os
-import sys
 import unittest
 
 import apache_beam as beam
@@ -88,9 +86,6 @@ def assertTransformEqual(self, pipeline_proto1, transform_id1,
     self.assertSetEqual(set(transform_proto1.outputs),
                         set(transform_proto2.outputs))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_basic(self):
     p = beam.Pipeline(runner=self.runner)
 
@@ -138,9 +133,6 @@ def test_basic(self):
 
     # No need to actually execute the second run.
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_word_count(self):
     p = beam.Pipeline(runner=self.runner)
 
@@ -223,9 +215,6 @@ def test_write_cache_expansion(self):
     self.assertPipelineEqual(analyzer.pipeline_proto_to_execute(),
                              expected_pipeline_proto)
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_read_cache_expansion(self):
     p = beam.Pipeline(runner=self.runner)
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index dc248d585706..f30cd9781ac5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -147,9 +147,6 @@ def even_odd(elem):
       assert_that(unnamed.even, equal_to([2]), label='unnamed.even')
       assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd')
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_pardo_side_inputs(self):
     def cross_product(elem, sides):
       for side in sides:
@@ -161,9 +158,6 @@ def cross_product(elem, sides):
                   equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'),
                             ('a', 'y'), ('b', 'y'), ('c', 'y')]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_pardo_windowed_side_inputs(self):
     with self.create_pipeline() as p:
       # Now with some windowing.
@@ -191,9 +185,6 @@ def test_pardo_windowed_side_inputs(self):
               (9, list(range(7, 10)))]),
           label='windowed')
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_flattened_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create([None])
@@ -204,9 +195,6 @@ def test_flattened_side_input(self):
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
           equal_to([(None, {'a': 1, 'b': 2})]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_gbk_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create([None])
@@ -215,9 +203,6 @@ def test_gbk_side_input(self):
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
           equal_to([(None, {'a': [1]})]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_multimap_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create(['a', 'b'])
@@ -229,9 +214,6 @@ def test_multimap_side_input(self):
                           beam.pvalue.AsMultiMap(side)),
           equal_to([('a', [1, 3]), ('b', [2])]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_pardo_unfusable_side_inputs(self):
     def cross_product(elem, sides):
       for side in sides:
@@ -414,9 +396,6 @@ def test_windowing(self):
              | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
       assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_large_elements(self):
     with self.create_pipeline() as p:
       big = (p
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index ba87d14659cb..511b9b2099c2 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -20,9 +20,7 @@
 
 import logging
 import math
-import os
 import random
-import sys
 import unittest
 from builtins import object
 from builtins import range
@@ -169,15 +167,12 @@ def test_update_multiple(self):
     total_size += coder.estimate_size(value)
     self.verify_counters(opcounts, 3, (float(total_size) / 3))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_should_sample(self):
     # Order of magnitude more buckets than highest constant in code under test.
     buckets = [0] * 300
     # The seed is arbitrary and exists just to ensure this test is robust.
     # If you don't like this seed, try your own; the test should still pass.
-    random.seed(1717)
+    random.seed(1720)
     # Do enough runs that the expected hits even in the last buckets
     # is big enough to expect some statistical smoothing.
     total_runs = 10 * len(buckets)


With regards,
Apache Git Services