You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2020/02/13 03:04:13 UTC
[beam] 01/01: Revert "[BEAM-7198] rename ToStringCoder to
ToBytesCoder for proper representation of its role"
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch revert-10828-rename_tostringcoder
in repository https://gitbox.apache.org/repos/asf/beam.git
commit f2284fbe6f3fc2a0a3e89ae57d5659fa13816768
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Wed Feb 12 19:03:51 2020 -0800
Revert "[BEAM-7198] rename ToStringCoder to ToBytesCoder for proper representation of its role"
---
sdks/python/apache_beam/coders/coders.py | 8 ++-----
.../apache_beam/coders/coders_test_common.py | 2 +-
.../apache_beam/examples/snippets/snippets_test.py | 4 ++--
sdks/python/apache_beam/io/filebasedsink_test.py | 26 +++++++++++-----------
sdks/python/apache_beam/io/textio.py | 6 ++---
5 files changed, 21 insertions(+), 25 deletions(-)
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 55356ab..3d1ddbf 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -423,7 +423,7 @@ class StrUtf8Coder(Coder):
Coder.register_structured_urn(common_urns.coders.STRING_UTF8.urn, StrUtf8Coder)
-class ToBytesCoder(Coder):
+class ToStringCoder(Coder):
"""A default string coder used if no sink coder is specified."""
if sys.version_info.major == 2:
@@ -440,16 +440,12 @@ class ToBytesCoder(Coder):
return value if isinstance(value, bytes) else str(value).encode('utf-8')
def decode(self, _):
- raise NotImplementedError('ToBytesCoder cannot be used for decoding.')
+ raise NotImplementedError('ToStringCoder cannot be used for decoding.')
def is_deterministic(self):
return True
-# alias to the old class name for a courtesy to users who reference it
-ToStringCoder = ToBytesCoder
-
-
class FastCoder(Coder):
"""Coder subclass used when a (faster) CoderImpl is supplied directly.
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index fd21261..4a9a9d0 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -78,7 +78,7 @@ class CodersTest(unittest.TestCase):
coders.FastCoder,
coders.ProtoCoder,
coders.RunnerAPICoderHolder,
- coders.ToBytesCoder
+ coders.ToStringCoder
])
assert not standard - cls.seen, standard - cls.seen
assert not standard - cls.seen_nested, standard - cls.seen_nested
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 4efe658..f54ccee 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -43,7 +43,7 @@ from apache_beam import WindowInto
from apache_beam import coders
from apache_beam import pvalue
from apache_beam import typehints
-from apache_beam.coders.coders import ToBytesCoder
+from apache_beam.coders.coders import ToStringCoder
from apache_beam.examples.snippets import snippets
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
@@ -468,7 +468,7 @@ class SnippetsTest(unittest.TestCase):
def __init__(self, file_to_write):
self.file_to_write = file_to_write
self.file_obj = None
- self.coder = ToBytesCoder()
+ self.coder = ToStringCoder()
def start_bundle(self):
assert self.file_to_write
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
index aab1ffa..15b461f 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -131,7 +131,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_file_sink_writing(self):
temp_path = os.path.join(self._new_tempdir(), 'FileBasedSink')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
init_token, writer_results = self._common_init(sink)
@@ -156,7 +156,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_file_sink_display_data(self):
temp_path = os.path.join(self._new_tempdir(), 'display')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
dd = DisplayData.create_from(sink)
expected_items = [
DisplayDataItemMatcher('compression', 'auto'),
@@ -170,7 +170,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_empty_write(self):
temp_path = tempfile.NamedTemporaryFile().name
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
with TestPipeline() as p:
p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
self.assertEqual(
@@ -182,7 +182,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
sink = MyFileBasedSink(
temp_path,
file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
- coder=coders.ToBytesCoder())
+ coder=coders.ToStringCoder())
with TestPipeline() as p:
p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
self.assertEqual(
@@ -195,7 +195,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
file_name_suffix='.output',
num_shards=3,
shard_name_template='_NN_SSS_',
- coder=coders.ToBytesCoder())
+ coder=coders.ToStringCoder())
with TestPipeline() as p:
p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
@@ -218,7 +218,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
sink = MyFileBasedSink(
file_path_prefix,
file_name_suffix='.output',
- coder=coders.ToBytesCoder())
+ coder=coders.ToStringCoder())
return sink.initialize_write()
temp_dir = _get_temp_dir(no_dir_path)
@@ -239,7 +239,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_temp_dir_uniqueness(self):
temp_path = os.path.join(self._new_tempdir(), 'unique')
- sink = MyFileBasedSink(temp_path, coder=coders.ToBytesCoder())
+ sink = MyFileBasedSink(temp_path, coder=coders.ToStringCoder())
init_list = [''] * 1000
temp_dir_list = [sink._create_temp_dir(temp_path) for _ in init_list]
temp_dir_set = set(temp_dir_list)
@@ -280,7 +280,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_file_sink_multi_shards(self):
temp_path = os.path.join(self._new_tempdir(), 'multishard')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
# Manually invoke the generic Sink API.
init_token = sink.initialize_write()
@@ -313,7 +313,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_file_sink_rename_error(self, rename_mock):
temp_path = os.path.join(self._new_tempdir(), 'rename_error')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
init_token, writer_results = self._common_init(sink)
pre_finalize_results = sink.pre_finalize(init_token, writer_results)
@@ -327,7 +327,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_file_sink_src_missing(self):
temp_path = os.path.join(self._new_tempdir(), 'src_missing')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
init_token, writer_results = self._common_init(sink)
pre_finalize_results = sink.pre_finalize(init_token, writer_results)
@@ -339,7 +339,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_file_sink_dst_matches_src(self):
temp_path = os.path.join(self._new_tempdir(), 'dst_matches_src')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
init_token, [res1, res2] = self._common_init(sink)
pre_finalize_results = sink.pre_finalize(init_token, [res1, res2])
@@ -360,7 +360,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_pre_finalize(self):
temp_path = os.path.join(self._new_tempdir(), 'pre_finalize')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
init_token, [res1, res2] = self._common_init(sink)
# no-op
@@ -389,7 +389,7 @@ class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
def test_pre_finalize_error(self, delete_mock):
temp_path = os.path.join(self._new_tempdir(), 'pre_finalize')
sink = MyFileBasedSink(
- temp_path, file_name_suffix='.output', coder=coders.ToBytesCoder())
+ temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
init_token, [res1, res2] = self._common_init(sink)
# no-op
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 4099e87..79095fc 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -348,7 +348,7 @@ class _TextSink(filebasedsink.FileBasedSink):
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
- coder=coders.ToBytesCoder(), # type: coders.Coder
+ coder=coders.ToStringCoder(), # type: coders.Coder
compression_type=CompressionTypes.AUTO,
header=None):
"""Initialize a _TextSink.
@@ -400,7 +400,7 @@ class _TextSink(filebasedsink.FileBasedSink):
def open(self, temp_path):
file_handle = super(_TextSink, self).open(temp_path)
if self._header is not None:
- file_handle.write(coders.ToBytesCoder().encode(self._header))
+ file_handle.write(coders.ToStringCoder().encode(self._header))
if self._append_trailing_newlines:
file_handle.write(b'\n')
return file_handle
@@ -586,7 +586,7 @@ class WriteToText(PTransform):
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None, # type: Optional[str]
- coder=coders.ToBytesCoder(), # type: coders.Coder
+ coder=coders.ToStringCoder(), # type: coders.Coder
compression_type=CompressionTypes.AUTO,
header=None):
r"""Initialize a :class:`WriteToText` transform.