You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/08/10 19:24:38 UTC

[beam] branch master updated: Several improvements for consistency of Python2 / Python3 conversions. (#6144)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c7ce77d  Several improvements for consistency of Python2 / Python3 conversions. (#6144)
c7ce77d is described below

commit c7ce77de32c99e7d4fdacfeae5709d8cdd8b8838
Author: tvalentyn <tv...@users.noreply.github.com>
AuthorDate: Fri Aug 10 12:24:33 2018 -0700

    Several improvements for consistency of Python2 / Python3 conversions. (#6144)
    
    * Use (str, past.builtins.unicode) where six.string_types was used in the past.
    
    * Use past.builtins.unicode where six.text_type was used in the past.
    
    * For Py2/Py3 compatibility, add complimentary __ne__ methods which are autogenerated in Py3.
    
    * Use bytes where six.binary_type was used in the past.
---
 sdks/python/apache_beam/io/gcp/pubsub.py       | 15 +++++++--------
 sdks/python/apache_beam/io/gcp/pubsub_test.py  |  4 ++--
 sdks/python/apache_beam/runners/common.py      |  6 +++---
 sdks/python/apache_beam/testing/test_stream.py |  9 +++++++++
 4 files changed, 21 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index d5c3f99..2414194 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -27,8 +27,7 @@ from __future__ import absolute_import
 import re
 from builtins import object
 
-import six
-from past.builtins import basestring
+from past.builtins import unicode
 
 from apache_beam import coders
 from apache_beam.io.iobase import Read
@@ -58,7 +57,7 @@ class PubsubMessage(object):
   This interface is experimental. No backwards compatibility guarantees.
 
   Attributes:
-    data: (six.binary_type) Message data. May be None.
+    data: (bytes) Message data. May be None.
     attributes: (dict) Key-value map of str to str, containing both user-defined
       and service generated attributes (such as id_label and
       timestamp_attribute). May be None.
@@ -151,7 +150,7 @@ class ReadFromPubSub(PTransform):
         case, deduplication of the stream will be strictly best effort.
       with_attributes:
         True - output elements will be :class:`~PubsubMessage` objects.
-        False - output elements will be of type ``six.binary_type`` (message
+        False - output elements will be of type ``bytes`` (message
         data only).
       timestamp_attribute: Message value to use as element timestamp. If None,
         uses message publishing time as the timestamp.
@@ -176,7 +175,7 @@ class ReadFromPubSub(PTransform):
 
   def expand(self, pvalue):
     pcoll = pvalue.pipeline | Read(self._source)
-    pcoll.element_type = six.binary_type
+    pcoll.element_type = bytes
     if self.with_attributes:
       pcoll = pcoll | Map(PubsubMessage._from_proto_str)
       pcoll.element_type = PubsubMessage
@@ -207,7 +206,7 @@ class _ReadStringsFromPubSub(PTransform):
          | ReadFromPubSub(self.topic, self.subscription, self.id_label,
                           with_attributes=False)
          | 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
-    p.element_type = basestring
+    p.element_type = unicode
     return p
 
 
@@ -247,7 +246,7 @@ class WriteToPubSub(PTransform):
       topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
       with_attributes:
         True - input elements will be :class:`~PubsubMessage` objects.
-        False - input elements will be of type ``six.binary_type`` (message
+        False - input elements will be of type ``bytes`` (message
         data only).
       id_label: If set, will set an attribute for each Cloud Pub/Sub message
         with the given name and a unique value. This attribute can then be used
@@ -276,7 +275,7 @@ class WriteToPubSub(PTransform):
     # Without attributes, message data is written as-is. With attributes,
     # message data + attributes are passed as a serialized protobuf string (see
     # ``PubsubMessage._to_proto_str`` for exact protobuf message type).
-    pcoll.element_type = six.binary_type
+    pcoll.element_type = bytes
     return pcoll | Write(self._sink)
 
   def to_runner_api_parameter(self, context):
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 914b72a..44024cd 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -127,7 +127,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
                               None, 'a_label', with_attributes=False,
                               timestamp_attribute=None)
              | beam.Map(lambda x: x))
-    self.assertEqual(str, pcoll.element_type)
+    self.assertEqual(bytes, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
     overrides = _get_transform_overrides(p.options)
@@ -150,7 +150,7 @@ class TestReadFromPubSubOverride(unittest.TestCase):
                  None, 'projects/fakeprj/subscriptions/a_subscription',
                  'a_label', with_attributes=False, timestamp_attribute=None)
              | beam.Map(lambda x: x))
-    self.assertEqual(str, pcoll.element_type)
+    self.assertEqual(bytes, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
     overrides = _get_transform_overrides(p.options)
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 38ae666..a714eab 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -31,7 +31,7 @@ from builtins import object
 from builtins import zip
 
 from future.utils import raise_
-from past.builtins import basestring
+from past.builtins import unicode
 
 from apache_beam.internal import util
 from apache_beam.options.value_provider import RuntimeValueProvider
@@ -677,7 +677,7 @@ class _OutputProcessor(OutputProcessor):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, basestring):
+        if not isinstance(tag, (str, unicode)):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
       if isinstance(result, WindowedValue):
@@ -724,7 +724,7 @@ class _OutputProcessor(OutputProcessor):
       tag = None
       if isinstance(result, TaggedOutput):
         tag = result.tag
-        if not isinstance(tag, basestring):
+        if not isinstance(tag, (str, unicode)):
           raise TypeError('In %s, tag %s is not a string' % (self, tag))
         result = result.value
 
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index caa74ec..519a1b5 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -72,6 +72,9 @@ class ElementEvent(Event):
   def __eq__(self, other):
     return self.timestamped_values == other.timestamped_values
 
+  def __ne__(self, other):
+    return not self == other
+
   def __hash__(self):
     return hash(self.timestamped_values)
 
@@ -88,6 +91,9 @@ class WatermarkEvent(Event):
   def __eq__(self, other):
     return self.new_watermark == other.new_watermark
 
+  def __ne__(self, other):
+    return not self == other
+
   def __hash__(self):
     return hash(self.new_watermark)
 
@@ -104,6 +110,9 @@ class ProcessingTimeEvent(Event):
   def __eq__(self, other):
     return self.advance_by == other.advance_by
 
+  def __ne__(self, other):
+    return not self == other
+
   def __hash__(self):
     return hash(self.advance_by)