You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/11/04 05:31:23 UTC
[beam] branch master updated: [BEAM-10869] Remove unused PubSubSink
with_attributes property (#13254)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bdeb4af [BEAM-10869] Remove unused PubSubSink with_attributes property (#13254)
bdeb4af is described below
commit bdeb4afa02bc3fe190c3f5fd397e8ec97d1586c4
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Nov 3 21:30:38 2020 -0800
[BEAM-10869] Remove unused PubSubSink with_attributes property (#13254)
* Remove unused PubSubSink with_attributes property
* Fixes yapf
---
.../pipeline/src/main/proto/beam_runner_api.proto | 4 --
sdks/python/apache_beam/io/gcp/pubsub.py | 65 +++++++++++-----------
sdks/python/apache_beam/io/gcp/pubsub_test.py | 5 --
sdks/python/apache_beam/io/iobase.py | 4 +-
4 files changed, 32 insertions(+), 46 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 3a43ef6..b617e51 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -688,10 +688,6 @@ message PubSubWritePayload {
// Attribute that uniquely identify messages.
string id_attribute = 3;
-
- // If true, writes Pub/Sub payload as well as attributes. If false, reads only the payload.
- // TODO(BEAM-10869): consider removing/deprecating this field when fixed.
- bool with_attributes = 4;
}
// A coder, the binary format for serialization and deserialization of data in
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 6c90a0b..1db7ba9 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -145,13 +145,14 @@ class ReadFromPubSub(PTransform):
# Implementation note: This ``PTransform`` is overridden by Directrunner.
- def __init__(self,
- topic=None, # type: Optional[str]
- subscription=None, # type: Optional[str]
- id_label=None, # type: Optional[str]
- with_attributes=False, # type: bool
- timestamp_attribute=None # type: Optional[str]
- ):
+ def __init__(
+ self,
+ topic=None, # type: Optional[str]
+ subscription=None, # type: Optional[str]
+ id_label=None, # type: Optional[str]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None # type: Optional[str]
+ ):
# type: (...) -> None
"""Initializes ``ReadFromPubSub``.
@@ -250,8 +251,7 @@ class _WriteStringsToPubSub(PTransform):
self.id_label = None
self.timestamp_attribute = None
self.project, self.topic_name = parse_topic(topic)
- self._sink = _PubSubSink(
- topic, id_label=None, with_attributes=False, timestamp_attribute=None)
+ self._sink = _PubSubSink(topic, id_label=None, timestamp_attribute=None)
def expand(self, pcoll):
pcoll = pcoll | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
@@ -264,12 +264,13 @@ class WriteToPubSub(PTransform):
# Implementation note: This ``PTransform`` is overridden by Directrunner.
- def __init__(self,
- topic, # type: str
- with_attributes=False, # type: bool
- id_label=None, # type: Optional[str]
- timestamp_attribute=None # type: Optional[str]
- ):
+ def __init__(
+ self,
+ topic, # type: str
+ with_attributes=False, # type: bool
+ id_label=None, # type: Optional[str]
+ timestamp_attribute=None # type: Optional[str]
+ ):
# type: (...) -> None
"""Initializes ``WriteToPubSub``.
@@ -292,8 +293,7 @@ class WriteToPubSub(PTransform):
self.timestamp_attribute = timestamp_attribute
self.project, self.topic_name = parse_topic(topic)
self.full_topic = topic
- self._sink = _PubSubSink(
- topic, id_label, with_attributes, timestamp_attribute)
+ self._sink = _PubSubSink(topic, id_label, timestamp_attribute)
@staticmethod
def message_to_proto_str(element):
@@ -373,14 +373,14 @@ class _PubSubSource(dataflow_io.NativeSource):
with_attributes: If False, will fetch just message data. Otherwise,
fetches ``PubsubMessage`` protobufs.
"""
-
- def __init__(self,
- topic=None, # type: Optional[str]
- subscription=None, # type: Optional[str]
- id_label=None, # type: Optional[str]
- with_attributes=False, # type: bool
- timestamp_attribute=None # type: Optional[str]
- ):
+ def __init__(
+ self,
+ topic=None, # type: Optional[str]
+ subscription=None, # type: Optional[str]
+ id_label=None, # type: Optional[str]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None # type: Optional[str]
+ ):
self.coder = coders.BytesCoder()
self.full_topic = topic
self.full_subscription = subscription
@@ -433,18 +433,15 @@ class _PubSubSink(dataflow_io.NativeSink):
This ``NativeSource`` is overridden by a native Pubsub implementation.
"""
-
- def __init__(self,
- topic, # type: str
- id_label, # type: Optional[str]
- with_attributes, # type: bool
- timestamp_attribute # type: Optional[str]
- ):
+ def __init__(
+ self,
+ topic, # type: str
+ id_label, # type: Optional[str]
+ timestamp_attribute # type: Optional[str]
+ ):
self.coder = coders.BytesCoder()
self.full_topic = topic
self.id_label = id_label
- #TODO(BEAM-10869): Remove with_attributes since we will never look at it.
- self.with_attributes = with_attributes
self.timestamp_attribute = timestamp_attribute
self.project, self.topic_name = parse_topic(topic)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 40fe386..0ffe105 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -781,7 +781,6 @@ class TestWriteToPubSub(unittest.TestCase):
sink = _PubSubSink(
topic='projects/fakeprj/topics/a_topic',
id_label=None,
- with_attributes=True,
# We expect encoded PubSub write transform to always return attributes.
timestamp_attribute=None)
transform = Write(sink)
@@ -798,7 +797,6 @@ class TestWriteToPubSub(unittest.TestCase):
self.assertEqual(
'projects/fakeprj/topics/a_topic', pubsub_write_payload.topic)
- self.assertTrue(pubsub_write_payload.with_attributes)
proto_transform = beam_runner_api_pb2.PTransform(
unique_name="dummy_label", spec=proto_transform_spec)
@@ -807,7 +805,6 @@ class TestWriteToPubSub(unittest.TestCase):
proto_transform, pubsub_write_payload, None)
self.assertTrue(isinstance(transform_from_proto, Write))
self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
- self.assertTrue(transform_from_proto.sink.with_attributes)
self.assertEqual(
'projects/fakeprj/topics/a_topic', transform_from_proto.sink.full_topic)
@@ -816,7 +813,6 @@ class TestWriteToPubSub(unittest.TestCase):
sink = _PubSubSink(
topic='projects/fakeprj/topics/a_topic',
id_label=None,
- with_attributes=True,
# We expect encoded PubSub write transform to always return attributes.
timestamp_attribute=None)
transform = Write(sink)
@@ -837,7 +833,6 @@ class TestWriteToPubSub(unittest.TestCase):
self.assertTrue(isinstance(transform_from_proto, Write))
self.assertTrue(isinstance(transform_from_proto.sink, _PubSubSink))
- self.assertTrue(transform_from_proto.sink.with_attributes)
self.assertIsNone(transform_from_proto.sink.id_label)
self.assertIsNone(transform_from_proto.sink.timestamp_attribute)
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e32a0d2..b0cf4e3 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1036,8 +1036,7 @@ class Write(ptransform.PTransform):
payload = beam_runner_api_pb2.PubSubWritePayload(
topic=self.sink.full_topic,
id_attribute=self.sink.id_label,
- timestamp_attribute=self.sink.timestamp_attribute,
- with_attributes=self.sink.with_attributes)
+ timestamp_attribute=self.sink.timestamp_attribute)
return (common_urns.composites.PUBSUB_WRITE.urn, payload)
else:
return super(Write, self).to_runner_api_parameter(context)
@@ -1058,7 +1057,6 @@ class Write(ptransform.PTransform):
sink = _PubSubSink(
topic=payload.topic or None,
id_label=payload.id_attribute or None,
- with_attributes=payload.with_attributes,
timestamp_attribute=payload.timestamp_attribute or None)
return Write(sink)