You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/11/04 05:31:23 UTC

[beam] branch master updated: [BEAM-10869] Remove unused PubSubSink with_attributes property (#13254)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bdeb4af  [BEAM-10869] Remove unused PubSubSink with_attributes property (#13254)
bdeb4af is described below

commit bdeb4afa02bc3fe190c3f5fd397e8ec97d1586c4
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Nov 3 21:30:38 2020 -0800

    [BEAM-10869] Remove unused PubSubSink with_attributes property (#13254)
    
    * Remove unused PubSubSink with_attributes property
    
    * Fixes yapf
---
 .../pipeline/src/main/proto/beam_runner_api.proto  |  4 --
 sdks/python/apache_beam/io/gcp/pubsub.py           | 65 +++++++++++-----------
 sdks/python/apache_beam/io/gcp/pubsub_test.py      |  5 --
 sdks/python/apache_beam/io/iobase.py               |  4 +-
 4 files changed, 32 insertions(+), 46 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 3a43ef6..b617e51 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -688,10 +688,6 @@ message PubSubWritePayload {
 
   // Attribute that uniquely identify messages.
   string id_attribute = 3;
-
-  // If true, writes Pub/Sub payload as well as attributes. If false, reads only the payload.
-  // TODO(BEAM-10869): consider removing/deprecating this field when fixed.
-  bool with_attributes = 4;
 }
 
 // A coder, the binary format for serialization and deserialization of data in
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 6c90a0b..1db7ba9 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -145,13 +145,14 @@ class ReadFromPubSub(PTransform):
 
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(self,
-               topic=None,  # type: Optional[str]
-               subscription=None,  # type: Optional[str]
-               id_label=None,  # type: Optional[str]
-               with_attributes=False,  # type: bool
-               timestamp_attribute=None  # type: Optional[str]
-              ):
+  def __init__(
+      self,
+      topic=None,  # type: Optional[str]
+      subscription=None,  # type: Optional[str]
+      id_label=None,  # type: Optional[str]
+      with_attributes=False,  # type: bool
+      timestamp_attribute=None  # type: Optional[str]
+  ):
     # type: (...) -> None
 
     """Initializes ``ReadFromPubSub``.
@@ -250,8 +251,7 @@ class _WriteStringsToPubSub(PTransform):
     self.id_label = None
     self.timestamp_attribute = None
     self.project, self.topic_name = parse_topic(topic)
-    self._sink = _PubSubSink(
-        topic, id_label=None, with_attributes=False, timestamp_attribute=None)
+    self._sink = _PubSubSink(topic, id_label=None, timestamp_attribute=None)
 
   def expand(self, pcoll):
     pcoll = pcoll | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
@@ -264,12 +264,13 @@ class WriteToPubSub(PTransform):
 
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(self,
-               topic,  # type: str
-               with_attributes=False,  # type: bool
-               id_label=None,  # type: Optional[str]
-               timestamp_attribute=None  # type: Optional[str]
-              ):
+  def __init__(
+      self,
+      topic,  # type: str
+      with_attributes=False,  # type: bool
+      id_label=None,  # type: Optional[str]
+      timestamp_attribute=None  # type: Optional[str]
+  ):
     # type: (...) -> None
 
     """Initializes ``WriteToPubSub``.
@@ -292,8 +293,7 @@ class WriteToPubSub(PTransform):
     self.timestamp_attribute = timestamp_attribute
     self.project, self.topic_name = parse_topic(topic)
     self.full_topic = topic
-    self._sink = _PubSubSink(
-        topic, id_label, with_attributes, timestamp_attribute)
+    self._sink = _PubSubSink(topic, id_label, timestamp_attribute)
 
   @staticmethod
   def message_to_proto_str(element):
@@ -373,14 +373,14 @@ class _PubSubSource(dataflow_io.NativeSource):
     with_attributes: If False, will fetch just message data. Otherwise,
       fetches ``PubsubMessage`` protobufs.
   """
-
-  def __init__(self,
-               topic=None,  # type: Optional[str]
-               subscription=None,  # type: Optional[str]
-               id_label=None,  # type: Optional[str]
-               with_attributes=False,  # type: bool
-               timestamp_attribute=None  # type: Optional[str]
-              ):
+  def __init__(
+      self,
+      topic=None,  # type: Optional[str]
+      subscription=None,  # type: Optional[str]
+      id_label=None,  # type: Optional[str]
+      with_attributes=False,  # type: bool
+      timestamp_attribute=None  # type: Optional[str]
+  ):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.full_subscription = subscription
@@ -433,18 +433,15 @@ class _PubSubSink(dataflow_io.NativeSink):
 
   This ``NativeSource`` is overridden by a native Pubsub implementation.
   """
-
-  def __init__(self,
-               topic,  # type: str
-               id_label,  # type: Optional[str]
-               with_attributes,  # type: bool
-               timestamp_attribute  # type: Optional[str]
-               ):
+  def __init__(
+      self,
+      topic,  # type: str
+      id_label,  # type: Optional[str]
+      timestamp_attribute  # type: Optional[str]
+  ):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.id_label = id_label
-    #TODO(BEAM-10869): Remove with_attributes since we will never look at it.
-    self.with_attributes = with_attributes
     self.timestamp_attribute = timestamp_attribute
 
     self.project, self.topic_name = parse_topic(topic)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 40fe386..0ffe105 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -781,7 +781,6 @@ class TestWriteToPubSub(unittest.TestCase):
     sink = _PubSubSink(
         topic='projects/fakeprj/topics/a_topic',
         id_label=None,
-        with_attributes=True,
         # We expect encoded PubSub write transform to always return attributes.
         timestamp_attribute=None)
     transform = Write(sink)
@@ -798,7 +797,6 @@ class TestWriteToPubSub(unittest.TestCase):
 
     self.assertEqual(
         'projects/fakeprj/topics/a_topic', pubsub_write_payload.topic)
-    self.assertTrue(pubsub_write_payload.with_attributes)
 
     proto_transform = beam_runner_api_pb2.PTransform(
         unique_name="dummy_label", spec=proto_transform_spec)
@@ -807,7 +805,6 @@ class TestWriteToPubSub(unittest.TestCase):
         proto_transform, pubsub_write_payload, None)
     self.assertTrue(isinstance(transform_from_proto, Write))
     self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
-    self.assertTrue(transform_from_proto.sink.with_attributes)
     self.assertEqual(
         'projects/fakeprj/topics/a_topic', transform_from_proto.sink.full_topic)
 
@@ -816,7 +813,6 @@ class TestWriteToPubSub(unittest.TestCase):
     sink = _PubSubSink(
         topic='projects/fakeprj/topics/a_topic',
         id_label=None,
-        with_attributes=True,
         # We expect encoded PubSub write transform to always return attributes.
         timestamp_attribute=None)
     transform = Write(sink)
@@ -837,7 +833,6 @@ class TestWriteToPubSub(unittest.TestCase):
 
     self.assertTrue(isinstance(transform_from_proto, Write))
     self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
-    self.assertTrue(transform_from_proto.sink.with_attributes)
     self.assertIsNone(transform_from_proto.sink.id_label)
     self.assertIsNone(transform_from_proto.sink.timestamp_attribute)
 
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e32a0d2..b0cf4e3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1036,8 +1036,7 @@ class Write(ptransform.PTransform):
       payload = beam_runner_api_pb2.PubSubWritePayload(
           topic=self.sink.full_topic,
           id_attribute=self.sink.id_label,
-          timestamp_attribute=self.sink.timestamp_attribute,
-          with_attributes=self.sink.with_attributes)
+          timestamp_attribute=self.sink.timestamp_attribute)
       return (common_urns.composites.PUBSUB_WRITE.urn, payload)
     else:
       return super(Write, self).to_runner_api_parameter(context)
@@ -1058,7 +1057,6 @@ class Write(ptransform.PTransform):
     sink = _PubSubSink(
         topic=payload.topic or None,
         id_label=payload.id_attribute or None,
-        with_attributes=payload.with_attributes,
         timestamp_attribute=payload.timestamp_attribute or None)
     return Write(sink)