You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/30 01:32:18 UTC

[beam] branch master updated: Python 3 port examples modules

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aea1744  Python 3 port examples modules
     new 39c4320  Merge pull request #7583 from RobbeSneyders/examples
aea1744 is described below

commit aea1744513cca6ad655c7aa0bccdc61b69786a44
Author: robbe <ro...@ml6.eu>
AuthorDate: Tue Jan 29 16:49:31 2019 +0100

    Python 3 port examples modules
---
 .../apache_beam/examples/complete/tfidf_test.py    |  4 ++--
 .../examples/cookbook/group_with_coder.py          |  3 ++-
 .../examples/cookbook/group_with_coder_test.py     |  5 +++--
 .../examples/cookbook/mergecontacts_test.py        |  2 +-
 .../cookbook/multiple_output_pardo_test.py         |  2 +-
 .../apache_beam/examples/snippets/snippets_test.py | 23 +++++++++++-----------
 .../examples/wordcount_debugging_test.py           |  2 +-
 .../apache_beam/examples/wordcount_minimal_test.py |  2 +-
 sdks/python/apache_beam/examples/wordcount_test.py |  2 +-
 sdks/python/apache_beam/testing/util.py            | 21 +++++++++++++++++---
 10 files changed, 42 insertions(+), 24 deletions(-)

diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 3e54101..2580d68 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -41,7 +41,7 @@ EXPECTED_RESULTS = set([
     ('def', '2.txt', 0.2027325540540822)])
 
 
-EXPECTED_LINE_RE = r'\(u\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
+EXPECTED_LINE_RE = r'\(u?\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
 
 
 class TfIdfTest(unittest.TestCase):
@@ -49,7 +49,7 @@ class TfIdfTest(unittest.TestCase):
   def create_file(self, path, contents):
     logging.info('Creating temp file: %s', path)
     with open(path, 'wb') as f:
-      f.write(contents)
+      f.write(contents.encode('utf-8'))
 
   def test_tfidf_transform(self):
     with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index f9c3788..a953fb3 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -55,10 +55,11 @@ class PlayerCoder(coders.Coder):
   def encode(self, o):
     """Encode to bytes with a trace that coder was used."""
     # Our encoding prepends an 'x:' prefix.
-    return 'x:%s' % str(o.name)
+    return b'x:%s' % str(o.name).encode('utf-8')
 
   def decode(self, s):
     # To decode, we strip off the prepended 'x:' prefix.
+    s = s.decode('utf-8')
     assert s[0:2] == 'x:'
     return Player(s[2:])
 
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 522333e..73d7377 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -28,7 +28,8 @@ from apache_beam.testing.util import open_shards
 
 # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was
 # used, we do not strip the prepended 'x:' string when decoding a Player object.
-group_with_coder.PlayerCoder.decode = lambda self, s: group_with_coder.Player(s)
+group_with_coder.PlayerCoder.decode = lambda self, s: group_with_coder.Player(
+    s.decode('utf-8'))
 
 
 class GroupWithCoderTest(unittest.TestCase):
@@ -41,7 +42,7 @@ class GroupWithCoderTest(unittest.TestCase):
   def create_temp_file(self, records):
     with tempfile.NamedTemporaryFile(delete=False) as f:
       for record in records:
-        f.write('%s\n' % record)
+        f.write(b'%s\n' % record.encode('utf-8'))
       return f.name
 
   def test_basics_with_type_check(self):
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index 8c7144b..0c2bc47 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -87,7 +87,7 @@ class MergeContactsTest(unittest.TestCase):
 
   def create_temp_file(self, contents):
     with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(contents)
+      f.write(contents.encode('utf-8'))
       return f.name
 
   def normalize_tsv_results(self, tsv_data):
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 78e02e7..6f7aa9f 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -37,7 +37,7 @@ class MultipleOutputParDo(unittest.TestCase):
 
   def create_temp_file(self, contents):
     with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(contents)
+      f.write(contents.encode('utf-8'))
       return f.name
 
   def get_wordcount_results(self, result_path):
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 7a3e516..d6ac1f5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -373,10 +373,10 @@ class TypeHintsTest(unittest.TestCase):
 
       class PlayerCoder(beam.coders.Coder):
         def encode(self, player):
-          return '%s:%s' % (player.team, player.name)
+          return ('%s:%s' % (player.team, player.name)).encode('utf-8')
 
         def decode(self, s):
-          return Player(*s.split(':'))
+          return Player(*s.decode('utf-8').split(':'))
 
         def is_deterministic(self):
           return True
@@ -428,14 +428,14 @@ class SnippetsTest(unittest.TestCase):
         assert self.file_to_read
         for file_name in glob.glob(self.file_to_read):
           if self.compression_type is None:
-            with open(file_name) as file:
+            with open(file_name, 'rb') as file:
               for record in file:
-                value = self.coder.decode(record.rstrip('\n'))
+                value = self.coder.decode(record.rstrip(b'\n'))
                 yield WindowedValue(value, -1, [window.GlobalWindow()])
           else:
-            with gzip.open(file_name, 'r') as file:
+            with gzip.open(file_name, 'rb') as file:
               for record in file:
-                value = self.coder.decode(record.rstrip('\n'))
+                value = self.coder.decode(record.rstrip(b'\n'))
                 yield WindowedValue(value, -1, [window.GlobalWindow()])
 
     def expand(self, pcoll):
@@ -461,11 +461,11 @@ class SnippetsTest(unittest.TestCase):
       def start_bundle(self):
         assert self.file_to_write
         # Appending a UUID to create a unique file object per invocation.
-        self.file_obj = open(self.file_to_write + str(uuid.uuid4()), 'w')
+        self.file_obj = open(self.file_to_write + str(uuid.uuid4()), 'wb')
 
       def process(self, element):
         assert self.file_obj
-        self.file_obj.write(self.coder.encode(element) + '\n')
+        self.file_obj.write(self.coder.encode(element) + b'\n')
 
       def finish_bundle(self):
         assert self.file_obj
@@ -499,7 +499,7 @@ class SnippetsTest(unittest.TestCase):
 
   def create_temp_file(self, contents=''):
     with tempfile.NamedTemporaryFile(delete=False) as f:
-      f.write(contents)
+      f.write(contents.encode('utf-8'))
       self.temp_files.append(f.name)
       return f.name
 
@@ -569,7 +569,8 @@ class SnippetsTest(unittest.TestCase):
         file_name = self._tmp_dir + os.sep + table_name
         assert os.path.exists(file_name)
         with open(file_name, 'ab') as f:
-          f.write(key + ':' + value + os.linesep)
+          content = (key + ':' + value + os.linesep).encode('utf-8')
+          f.write(content)
 
       def rename_table(self, access_token, old_name, new_name):
         assert access_token == self._dummy_token
@@ -623,7 +624,7 @@ class SnippetsTest(unittest.TestCase):
   def test_model_textio_compressed(self):
     temp_path = self.create_temp_file('aa\nbb\ncc')
     gzip_file_name = temp_path + '.gz'
-    with open(temp_path) as src, gzip.open(gzip_file_name, 'wb') as dst:
+    with open(temp_path, 'rb') as src, gzip.open(gzip_file_name, 'wb') as dst:
       dst.writelines(src)
       # Add the temporary gzip file to be cleaned up as well.
       self.temp_files.append(gzip_file_name)
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging_test.py b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
index 3e5afed..cf824d1 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
@@ -41,7 +41,7 @@ class WordCountTest(unittest.TestCase):
     results = []
     with open_shards(temp_path + '.result-*-of-*') as result_file:
       for line in result_file:
-        match = re.search(r'([A-Za-z]+): ([0-9]+)', line.decode('utf-8'))
+        match = re.search(r'([A-Za-z]+): ([0-9]+)', line)
         if match is not None:
           results.append((match.group(1), int(match.group(2))))
     return results
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal_test.py b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
index c79bb0a..011e678 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
@@ -51,7 +51,7 @@ class WordCountMinimalTest(unittest.TestCase):
     results = []
     with open_shards(temp_path + '.result-*-of-*') as result_file:
       for line in result_file:
-        match = re.search(r'([a-z]+): ([0-9]+)', line.decode('utf-8'))
+        match = re.search(r'([a-z]+): ([0-9]+)', line)
         if match is not None:
           results.append((match.group(1), int(match.group(2))))
     self.assertEqual(sorted(results), sorted(expected_words.items()))
diff --git a/sdks/python/apache_beam/examples/wordcount_test.py b/sdks/python/apache_beam/examples/wordcount_test.py
index 1c36c04..aa131cb 100644
--- a/sdks/python/apache_beam/examples/wordcount_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_test.py
@@ -52,7 +52,7 @@ class WordCountTest(unittest.TestCase):
     results = []
     with open_shards(temp_path + '.result-*-of-*') as result_file:
       for line in result_file:
-        match = re.search(r'(\S+): ([0-9]+)', line.decode('utf-8'))
+        match = re.search(r'(\S+): ([0-9]+)', line)
         if match is not None:
           results.append((match.group(1), int(match.group(2))))
     self.assertEqual(sorted(results), sorted(expected_words.items()))
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index 4dbca4d..7e68540 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
 
 import collections
 import glob
+import io
 import tempfile
 from builtins import object
 
@@ -213,11 +214,25 @@ def assert_that(actual, matcher, label='assert_that',
 
 
 @experimental()
-def open_shards(glob_pattern):
-  """Returns a composite file of all shards matching the given glob pattern."""
+def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
+  """Returns a composite file of all shards matching the given glob pattern.
+
+  Args:
+    glob_pattern (str): Pattern used to match files which should be opened.
+    mode (str): Specify the mode in which the file should be opened. For
+                available modes, check io.open() documentation.
+    encoding (str): Name of the encoding used to decode or encode the file.
+                    This should only be used in text mode.
+
+  Returns:
+    A stream with the contents of the opened files.
+  """
+  if 'b' in mode:
+    encoding = None
+
   with tempfile.NamedTemporaryFile(delete=False) as out_file:
     for shard in glob.glob(glob_pattern):
       with open(shard, 'rb') as in_file:
         out_file.write(in_file.read())
     concatenated_file_name = out_file.name
-  return open(concatenated_file_name, 'rb')
+  return io.open(concatenated_file_name, mode, encoding=encoding)