You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/15 01:51:39 UTC

[beam] branch master updated: [BEAM-5319] Python 3 port runners module (#7445)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bcf9fd  [BEAM-5319] Python 3 port runners module (#7445)
2bcf9fd is described below

commit 2bcf9fd40390dd0192c5456abedfbb290a792866
Author: Robbe Sneyders <ro...@gmail.com>
AuthorDate: Tue Jan 15 02:51:31 2019 +0100

    [BEAM-5319] Python 3 port runners module (#7445)
    
    * Python 3 port runners module
    
    * Change random seed in opcounters test to one that works on both Python 2 and 3
---
 .../runners/interactive/cache_manager.py            |  3 ++-
 .../runners/interactive/cache_manager_test.py       |  4 ++--
 .../runners/interactive/interactive_runner_test.py  |  8 --------
 .../runners/interactive/pipeline_analyzer_test.py   | 11 -----------
 .../runners/portability/fn_api_runner_test.py       | 21 ---------------------
 .../apache_beam/runners/worker/opcounters_test.py   |  7 +------
 6 files changed, 5 insertions(+), 49 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 75805d6..e8816fe 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -202,7 +202,8 @@ class SafeFastPrimitivesCoder(coders.Coder):
   # pylint: disable=deprecated-urllib-function
 
   def encode(self, value):
-    return quote(coders.coders.FastPrimitivesCoder().encode(value))
+    return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
+        'utf-8')
 
   def decode(self, value):
     return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index ff82f3b..641643f 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -61,10 +61,10 @@ class FileBasedCacheManagerTest(unittest.TestCase):
     time.sleep(0.1)
 
     cache_file = cache_label + '-1-of-2'
-    with open(self.cache_manager._path(prefix, cache_file), 'w') as f:
+    with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
       for line in pcoll_list:
         f.write(cache.SafeFastPrimitivesCoder().encode(line))
-        f.write('\n')
+        f.write(b'\n')
 
   def test_exists(self):
     """Test that CacheManager can correctly tell if the cache exists or not."""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index 767e06e..9958d21 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -24,8 +24,6 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
-import os
-import sys
 import unittest
 
 import apache_beam as beam
@@ -44,9 +42,6 @@ def print_with_message(msg):
 
 class InteractiveRunnerTest(unittest.TestCase):
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_basic(self):
     p = beam.Pipeline(
         runner=interactive_runner.InteractiveRunner(
@@ -62,9 +57,6 @@ class InteractiveRunnerTest(unittest.TestCase):
     _ = pc0 | 'Print3' >> beam.Map(print_with_message('Run3'))
     p.run().wait_until_finish()
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_wordcount(self):
 
     class WordExtractingDoFn(beam.DoFn):
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index 53a4a33..92b5af1 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -24,8 +24,6 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
-import os
-import sys
 import unittest
 
 import apache_beam as beam
@@ -88,9 +86,6 @@ class PipelineAnalyzerTest(unittest.TestCase):
     self.assertSetEqual(set(transform_proto1.outputs),
                         set(transform_proto2.outputs))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_basic(self):
     p = beam.Pipeline(runner=self.runner)
 
@@ -138,9 +133,6 @@ class PipelineAnalyzerTest(unittest.TestCase):
 
     # No need to actually execute the second run.
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_word_count(self):
     p = beam.Pipeline(runner=self.runner)
 
@@ -223,9 +215,6 @@ class PipelineAnalyzerTest(unittest.TestCase):
     self.assertPipelineEqual(analyzer.pipeline_proto_to_execute(),
                              expected_pipeline_proto)
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_read_cache_expansion(self):
     p = beam.Pipeline(runner=self.runner)
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index dc248d5..f30cd97 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -147,9 +147,6 @@ class FnApiRunnerTest(unittest.TestCase):
       assert_that(unnamed.even, equal_to([2]), label='unnamed.even')
       assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd')
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_pardo_side_inputs(self):
     def cross_product(elem, sides):
       for side in sides:
@@ -161,9 +158,6 @@ class FnApiRunnerTest(unittest.TestCase):
                   equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'),
                             ('a', 'y'), ('b', 'y'), ('c', 'y')]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_pardo_windowed_side_inputs(self):
     with self.create_pipeline() as p:
       # Now with some windowing.
@@ -191,9 +185,6 @@ class FnApiRunnerTest(unittest.TestCase):
               (9, list(range(7, 10)))]),
           label='windowed')
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_flattened_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create([None])
@@ -204,9 +195,6 @@ class FnApiRunnerTest(unittest.TestCase):
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
           equal_to([(None, {'a': 1, 'b': 2})]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_gbk_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create([None])
@@ -215,9 +203,6 @@ class FnApiRunnerTest(unittest.TestCase):
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
           equal_to([(None, {'a': [1]})]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_multimap_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create(['a', 'b'])
@@ -229,9 +214,6 @@ class FnApiRunnerTest(unittest.TestCase):
                           beam.pvalue.AsMultiMap(side)),
           equal_to([('a', [1, 3]), ('b', [2])]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_pardo_unfusable_side_inputs(self):
     def cross_product(elem, sides):
       for side in sides:
@@ -414,9 +396,6 @@ class FnApiRunnerTest(unittest.TestCase):
              | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
       assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_large_elements(self):
     with self.create_pipeline() as p:
       big = (p
diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py
index ba87d14..511b9b2 100644
--- a/sdks/python/apache_beam/runners/worker/opcounters_test.py
+++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py
@@ -20,9 +20,7 @@ from __future__ import division
 
 import logging
 import math
-import os
 import random
-import sys
 import unittest
 from builtins import object
 from builtins import range
@@ -169,15 +167,12 @@ class OperationCountersTest(unittest.TestCase):
     total_size += coder.estimate_size(value)
     self.verify_counters(opcounts, 3, (float(total_size) / 3))
 
-  @unittest.skipIf(sys.version_info[0] == 3 and
-                   os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
-                   'This test still needs to be fixed on Python 3.')
   def test_should_sample(self):
     # Order of magnitude more buckets than highest constant in code under test.
     buckets = [0] * 300
     # The seed is arbitrary and exists just to ensure this test is robust.
     # If you don't like this seed, try your own; the test should still pass.
-    random.seed(1717)
+    random.seed(1720)
     # Do enough runs that the expected hits even in the last buckets
     # is big enough to expect some statistical smoothing.
     total_runs = 10 * len(buckets)