You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/01/30 01:32:18 UTC
[beam] branch master updated: Python 3 port examples modules
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aea1744 Python 3 port examples modules
new 39c4320 Merge pull request #7583 from RobbeSneyders/examples
aea1744 is described below
commit aea1744513cca6ad655c7aa0bccdc61b69786a44
Author: robbe <ro...@ml6.eu>
AuthorDate: Tue Jan 29 16:49:31 2019 +0100
Python 3 port examples modules
---
.../apache_beam/examples/complete/tfidf_test.py | 4 ++--
.../examples/cookbook/group_with_coder.py | 3 ++-
.../examples/cookbook/group_with_coder_test.py | 5 +++--
.../examples/cookbook/mergecontacts_test.py | 2 +-
.../cookbook/multiple_output_pardo_test.py | 2 +-
.../apache_beam/examples/snippets/snippets_test.py | 23 +++++++++++-----------
.../examples/wordcount_debugging_test.py | 2 +-
.../apache_beam/examples/wordcount_minimal_test.py | 2 +-
sdks/python/apache_beam/examples/wordcount_test.py | 2 +-
sdks/python/apache_beam/testing/util.py | 21 +++++++++++++++++---
10 files changed, 42 insertions(+), 24 deletions(-)
diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py
index 3e54101..2580d68 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf_test.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py
@@ -41,7 +41,7 @@ EXPECTED_RESULTS = set([
('def', '2.txt', 0.2027325540540822)])
-EXPECTED_LINE_RE = r'\(u\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
+EXPECTED_LINE_RE = r'\(u?\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
class TfIdfTest(unittest.TestCase):
@@ -49,7 +49,7 @@ class TfIdfTest(unittest.TestCase):
def create_file(self, path, contents):
logging.info('Creating temp file: %s', path)
with open(path, 'wb') as f:
- f.write(contents)
+ f.write(contents.encode('utf-8'))
def test_tfidf_transform(self):
with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index f9c3788..a953fb3 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -55,10 +55,11 @@ class PlayerCoder(coders.Coder):
def encode(self, o):
"""Encode to bytes with a trace that coder was used."""
# Our encoding prepends an 'x:' prefix.
- return 'x:%s' % str(o.name)
+ return b'x:%s' % str(o.name).encode('utf-8')
def decode(self, s):
# To decode, we strip off the prepended 'x:' prefix.
+ s = s.decode('utf-8')
assert s[0:2] == 'x:'
return Player(s[2:])
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 522333e..73d7377 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -28,7 +28,8 @@ from apache_beam.testing.util import open_shards
# Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was
# used, we do not strip the prepended 'x:' string when decoding a Player object.
-group_with_coder.PlayerCoder.decode = lambda self, s: group_with_coder.Player(s)
+group_with_coder.PlayerCoder.decode = lambda self, s: group_with_coder.Player(
+ s.decode('utf-8'))
class GroupWithCoderTest(unittest.TestCase):
@@ -41,7 +42,7 @@ class GroupWithCoderTest(unittest.TestCase):
def create_temp_file(self, records):
with tempfile.NamedTemporaryFile(delete=False) as f:
for record in records:
- f.write('%s\n' % record)
+ f.write(b'%s\n' % record.encode('utf-8'))
return f.name
def test_basics_with_type_check(self):
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
index 8c7144b..0c2bc47 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py
@@ -87,7 +87,7 @@ class MergeContactsTest(unittest.TestCase):
def create_temp_file(self, contents):
with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
+ f.write(contents.encode('utf-8'))
return f.name
def normalize_tsv_results(self, tsv_data):
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
index 78e02e7..6f7aa9f 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py
@@ -37,7 +37,7 @@ class MultipleOutputParDo(unittest.TestCase):
def create_temp_file(self, contents):
with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
+ f.write(contents.encode('utf-8'))
return f.name
def get_wordcount_results(self, result_path):
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 7a3e516..d6ac1f5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -373,10 +373,10 @@ class TypeHintsTest(unittest.TestCase):
class PlayerCoder(beam.coders.Coder):
def encode(self, player):
- return '%s:%s' % (player.team, player.name)
+ return ('%s:%s' % (player.team, player.name)).encode('utf-8')
def decode(self, s):
- return Player(*s.split(':'))
+ return Player(*s.decode('utf-8').split(':'))
def is_deterministic(self):
return True
@@ -428,14 +428,14 @@ class SnippetsTest(unittest.TestCase):
assert self.file_to_read
for file_name in glob.glob(self.file_to_read):
if self.compression_type is None:
- with open(file_name) as file:
+ with open(file_name, 'rb') as file:
for record in file:
- value = self.coder.decode(record.rstrip('\n'))
+ value = self.coder.decode(record.rstrip(b'\n'))
yield WindowedValue(value, -1, [window.GlobalWindow()])
else:
- with gzip.open(file_name, 'r') as file:
+ with gzip.open(file_name, 'rb') as file:
for record in file:
- value = self.coder.decode(record.rstrip('\n'))
+ value = self.coder.decode(record.rstrip(b'\n'))
yield WindowedValue(value, -1, [window.GlobalWindow()])
def expand(self, pcoll):
@@ -461,11 +461,11 @@ class SnippetsTest(unittest.TestCase):
def start_bundle(self):
assert self.file_to_write
# Appending a UUID to create a unique file object per invocation.
- self.file_obj = open(self.file_to_write + str(uuid.uuid4()), 'w')
+ self.file_obj = open(self.file_to_write + str(uuid.uuid4()), 'wb')
def process(self, element):
assert self.file_obj
- self.file_obj.write(self.coder.encode(element) + '\n')
+ self.file_obj.write(self.coder.encode(element) + b'\n')
def finish_bundle(self):
assert self.file_obj
@@ -499,7 +499,7 @@ class SnippetsTest(unittest.TestCase):
def create_temp_file(self, contents=''):
with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(contents)
+ f.write(contents.encode('utf-8'))
self.temp_files.append(f.name)
return f.name
@@ -569,7 +569,8 @@ class SnippetsTest(unittest.TestCase):
file_name = self._tmp_dir + os.sep + table_name
assert os.path.exists(file_name)
with open(file_name, 'ab') as f:
- f.write(key + ':' + value + os.linesep)
+ content = (key + ':' + value + os.linesep).encode('utf-8')
+ f.write(content)
def rename_table(self, access_token, old_name, new_name):
assert access_token == self._dummy_token
@@ -623,7 +624,7 @@ class SnippetsTest(unittest.TestCase):
def test_model_textio_compressed(self):
temp_path = self.create_temp_file('aa\nbb\ncc')
gzip_file_name = temp_path + '.gz'
- with open(temp_path) as src, gzip.open(gzip_file_name, 'wb') as dst:
+ with open(temp_path, 'rb') as src, gzip.open(gzip_file_name, 'wb') as dst:
dst.writelines(src)
# Add the temporary gzip file to be cleaned up as well.
self.temp_files.append(gzip_file_name)
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging_test.py b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
index 3e5afed..cf824d1 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging_test.py
@@ -41,7 +41,7 @@ class WordCountTest(unittest.TestCase):
results = []
with open_shards(temp_path + '.result-*-of-*') as result_file:
for line in result_file:
- match = re.search(r'([A-Za-z]+): ([0-9]+)', line.decode('utf-8'))
+ match = re.search(r'([A-Za-z]+): ([0-9]+)', line)
if match is not None:
results.append((match.group(1), int(match.group(2))))
return results
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal_test.py b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
index c79bb0a..011e678 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal_test.py
@@ -51,7 +51,7 @@ class WordCountMinimalTest(unittest.TestCase):
results = []
with open_shards(temp_path + '.result-*-of-*') as result_file:
for line in result_file:
- match = re.search(r'([a-z]+): ([0-9]+)', line.decode('utf-8'))
+ match = re.search(r'([a-z]+): ([0-9]+)', line)
if match is not None:
results.append((match.group(1), int(match.group(2))))
self.assertEqual(sorted(results), sorted(expected_words.items()))
diff --git a/sdks/python/apache_beam/examples/wordcount_test.py b/sdks/python/apache_beam/examples/wordcount_test.py
index 1c36c04..aa131cb 100644
--- a/sdks/python/apache_beam/examples/wordcount_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_test.py
@@ -52,7 +52,7 @@ class WordCountTest(unittest.TestCase):
results = []
with open_shards(temp_path + '.result-*-of-*') as result_file:
for line in result_file:
- match = re.search(r'(\S+): ([0-9]+)', line.decode('utf-8'))
+ match = re.search(r'(\S+): ([0-9]+)', line)
if match is not None:
results.append((match.group(1), int(match.group(2))))
self.assertEqual(sorted(results), sorted(expected_words.items()))
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index 4dbca4d..7e68540 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
import collections
import glob
+import io
import tempfile
from builtins import object
@@ -213,11 +214,25 @@ def assert_that(actual, matcher, label='assert_that',
@experimental()
-def open_shards(glob_pattern):
- """Returns a composite file of all shards matching the given glob pattern."""
+def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
+ """Returns a composite file of all shards matching the given glob pattern.
+
+ Args:
+ glob_pattern (str): Pattern used to match files which should be opened.
+ mode (str): Specify the mode in which the file should be opened. For
+ available modes, check io.open() documentation.
+ encoding (str): Name of the encoding used to decode or encode the file.
+ This should only be used in text mode.
+
+ Returns:
+ A stream with the contents of the opened files.
+ """
+ if 'b' in mode:
+ encoding = None
+
with tempfile.NamedTemporaryFile(delete=False) as out_file:
for shard in glob.glob(glob_pattern):
with open(shard, 'rb') as in_file:
out_file.write(in_file.read())
concatenated_file_name = out_file.name
- return open(concatenated_file_name, 'rb')
+ return io.open(concatenated_file_name, mode, encoding=encoding)