You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/08/10 19:24:38 UTC
[beam] branch master updated: Several improvements for consistency
of Python2 / Python3 conversions. (#6144)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c7ce77d Several improvements for consistency of Python2 / Python3 conversions. (#6144)
c7ce77d is described below
commit c7ce77de32c99e7d4fdacfeae5709d8cdd8b8838
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Fri Aug 10 12:24:33 2018 -0700
Several improvements for consistency of Python2 / Python3 conversions. (#6144)
* Use (str, past.builtins.unicode) where six.string_types was used in the past.
* Use past.builtins.unicode where six.text_type was used in the past.
* For Py2/Py3 compatibility, add complimentary __ne__ methods which are autogenerated in Py3.
* Use bytes where six.binary_type was used in the past.
---
sdks/python/apache_beam/io/gcp/pubsub.py | 15 +++++++--------
sdks/python/apache_beam/io/gcp/pubsub_test.py | 4 ++--
sdks/python/apache_beam/runners/common.py | 6 +++---
sdks/python/apache_beam/testing/test_stream.py | 9 +++++++++
4 files changed, 21 insertions(+), 13 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index d5c3f99..2414194 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -27,8 +27,7 @@ from __future__ import absolute_import
import re
from builtins import object
-import six
-from past.builtins import basestring
+from past.builtins import unicode
from apache_beam import coders
from apache_beam.io.iobase import Read
@@ -58,7 +57,7 @@ class PubsubMessage(object):
This interface is experimental. No backwards compatibility guarantees.
Attributes:
- data: (six.binary_type) Message data. May be None.
+ data: (bytes) Message data. May be None.
attributes: (dict) Key-value map of str to str, containing both user-defined
and service generated attributes (such as id_label and
timestamp_attribute). May be None.
@@ -151,7 +150,7 @@ class ReadFromPubSub(PTransform):
case, deduplication of the stream will be strictly best effort.
with_attributes:
True - output elements will be :class:`~PubsubMessage` objects.
- False - output elements will be of type ``six.binary_type`` (message
+ False - output elements will be of type ``bytes`` (message
data only).
timestamp_attribute: Message value to use as element timestamp. If None,
uses message publishing time as the timestamp.
@@ -176,7 +175,7 @@ class ReadFromPubSub(PTransform):
def expand(self, pvalue):
pcoll = pvalue.pipeline | Read(self._source)
- pcoll.element_type = six.binary_type
+ pcoll.element_type = bytes
if self.with_attributes:
pcoll = pcoll | Map(PubsubMessage._from_proto_str)
pcoll.element_type = PubsubMessage
@@ -207,7 +206,7 @@ class _ReadStringsFromPubSub(PTransform):
| ReadFromPubSub(self.topic, self.subscription, self.id_label,
with_attributes=False)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
- p.element_type = basestring
+ p.element_type = unicode
return p
@@ -247,7 +246,7 @@ class WriteToPubSub(PTransform):
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
with_attributes:
True - input elements will be :class:`~PubsubMessage` objects.
- False - input elements will be of type ``six.binary_type`` (message
+ False - input elements will be of type ``bytes`` (message
data only).
id_label: If set, will set an attribute for each Cloud Pub/Sub message
with the given name and a unique value. This attribute can then be used
@@ -276,7 +275,7 @@ class WriteToPubSub(PTransform):
# Without attributes, message data is written as-is. With attributes,
# message data + attributes are passed as a serialized protobuf string (see
# ``PubsubMessage._to_proto_str`` for exact protobuf message type).
- pcoll.element_type = six.binary_type
+ pcoll.element_type = bytes
return pcoll | Write(self._sink)
def to_runner_api_parameter(self, context):
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 914b72a..44024cd 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -127,7 +127,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
None, 'a_label', with_attributes=False,
timestamp_attribute=None)
| beam.Map(lambda x: x))
- self.assertEqual(str, pcoll.element_type)
+ self.assertEqual(bytes, pcoll.element_type)
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
@@ -150,7 +150,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
None, 'projects/fakeprj/subscriptions/a_subscription',
'a_label', with_attributes=False, timestamp_attribute=None)
| beam.Map(lambda x: x))
- self.assertEqual(str, pcoll.element_type)
+ self.assertEqual(bytes, pcoll.element_type)
# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 38ae666..a714eab 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -31,7 +31,7 @@ from builtins import object
from builtins import zip
from future.utils import raise_
-from past.builtins import basestring
+from past.builtins import unicode
from apache_beam.internal import util
from apache_beam.options.value_provider import RuntimeValueProvider
@@ -677,7 +677,7 @@ class _OutputProcessor(OutputProcessor):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
- if not isinstance(tag, basestring):
+ if not isinstance(tag, (str, unicode)):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
if isinstance(result, WindowedValue):
@@ -724,7 +724,7 @@ class _OutputProcessor(OutputProcessor):
tag = None
if isinstance(result, TaggedOutput):
tag = result.tag
- if not isinstance(tag, basestring):
+ if not isinstance(tag, (str, unicode)):
raise TypeError('In %s, tag %s is not a string' % (self, tag))
result = result.value
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index caa74ec..519a1b5 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -72,6 +72,9 @@ class ElementEvent(Event):
def __eq__(self, other):
return self.timestamped_values == other.timestamped_values
+ def __ne__(self, other):
+ return not self == other
+
def __hash__(self):
return hash(self.timestamped_values)
@@ -88,6 +91,9 @@ class WatermarkEvent(Event):
def __eq__(self, other):
return self.new_watermark == other.new_watermark
+ def __ne__(self, other):
+ return not self == other
+
def __hash__(self):
return hash(self.new_watermark)
@@ -104,6 +110,9 @@ class ProcessingTimeEvent(Event):
def __eq__(self, other):
return self.advance_by == other.advance_by
+ def __ne__(self, other):
+ return not self == other
+
def __hash__(self):
return hash(self.advance_by)