You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/09/23 14:33:45 UTC
[beam] 01/01: Revert "[BEAM-10861] Adds URNs and payloads to PubSub
transforms to allow runner-native overrides (#12760)"
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch revert-12760-add_pubsub_payload
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 967d7288cd5069f21617c98a9156dd4076bb9fb9
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Sep 23 07:32:56 2020 -0700
Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"
This reverts commit a19d06eaf5d9639df330915ec175c8c8082c4980.
---
.../pipeline/src/main/proto/beam_runner_api.proto | 54 ----------
sdks/python/apache_beam/io/gcp/pubsub.py | 115 ++++++---------------
sdks/python/apache_beam/io/gcp/pubsub_test.py | 81 ---------------
3 files changed, 34 insertions(+), 216 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 848522b..0298b3f 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -324,12 +324,6 @@ message StandardPTransforms {
// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"];
-
- // Payload: PubSubReadPayload.
- PUBSUB_READ = 4 [(beam_urn) = "beam:transform:pubsub_read:v1"];
-
- // Payload: PubSubWritePayload.
- PUBSUB_WRITE = 5 [(beam_urn) = "beam:transform:pubsub_write:v1"];
}
// Payload for all of these: CombinePayload
enum CombineComponents {
@@ -656,54 +650,6 @@ message WriteFilesPayload {
map<string, SideInput> side_inputs = 5;
}
-// Payload used by Google Cloud Pub/Sub read transform.
-// This can be used by runners that wish to override Beam Pub/Sub read transform
-// with a native implementation.
-message PubSubReadPayload {
-
- // Topic to read from. Exactly one of topic or subscription should be set.
- string topic = 1;
-
- // Subscription to read from. Exactly one of topic or subscription should be set.
- string subscription = 2;
-
- // Attribute that provides element timestamps.
- string timestamp_attribute = 3;
-
- // Attribute to be used for uniquely identifying messages.
- string id_attribute = 4;
-
- // If true, reads Pub/Sub payload as well as attributes. If false, reads only the payload.
- bool with_attributes = 5;
-
- // JSON serialized parse function for attibutes.
- string serialized_attribute_fn = 6;
-}
-
-// Payload used by Google Cloud Pub/Sub write transform.
-// This can be used by runners that wish to override Beam Pub/Sub write transform
-// with a native implementation.
-message PubSubWritePayload {
-
- // Topic to write to.
- string topic = 1;
-
- // Attribute that provides element timestamps.
- string timestamp_attribute = 2;
-
- // Attribute that uniquely identify messages.
- string id_attribute = 3;
-
- // If true, writes Pub/Sub payload as well as attributes. If false, reads only the payload.
- bool with_attributes = 4;
-
- // JSON serialized parse function for attributes.
- string serialized_attribute_fn = 5;
-
- // Coder ID for the Pub/Sub sink (if the runner requires it).
- string coder_id = 6;
-}
-
// A coder, the binary format for serialization and deserialization of data in
// a pipeline.
message Coder {
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index af5fd9b..b0f8bdf 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,8 +38,6 @@ from past.builtins import unicode
from apache_beam import coders
from apache_beam.io.iobase import Read
from apache_beam.io.iobase import Write
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
@@ -147,14 +145,13 @@ class ReadFromPubSub(PTransform):
# Implementation note: This ``PTransform`` is overridden by Directrunner.
- def __init__(
- self,
- topic=None, # type: Optional[str]
- subscription=None, # type: Optional[str]
- id_label=None, # type: Optional[str]
- with_attributes=False, # type: bool
- timestamp_attribute=None # type: Optional[str]
- ):
+ def __init__(self,
+ topic=None, # type: Optional[str]
+ subscription=None, # type: Optional[str]
+ id_label=None, # type: Optional[str]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None # type: Optional[str]
+ ):
# type: (...) -> None
"""Initializes ``ReadFromPubSub``.
@@ -206,31 +203,10 @@ class ReadFromPubSub(PTransform):
pcoll.element_type = PubsubMessage
return pcoll
- def _pubsub_read_payload(self):
- return beam_runner_api_pb2.PubSubReadPayload(
- topic=self._source.full_topic,
- subscription=self._source.full_subscription,
- timestamp_attribute=self._source.timestamp_attribute,
- id_attribute=self._source.id_label,
- with_attributes=self.with_attributes,
- serialized_attribute_fn='')
-
def to_runner_api_parameter(self, context):
- payload = self._pubsub_read_payload()
- return (common_urns.composites.PUBSUB_READ.urn, payload)
-
- @staticmethod
- @PTransform.register_urn(
- common_urns.composites.PUBSUB_READ.urn,
- beam_runner_api_pb2.PubSubReadPayload)
- def from_runner_api_parameter(
- unused_ptransform, pubsub_read_payload, unused_context):
- return ReadFromPubSub(
- topic=pubsub_read_payload.topic,
- subscription=pubsub_read_payload.subscription,
- id_label=pubsub_read_payload.id_attribute,
- with_attributes=pubsub_read_payload.with_attributes,
- timestamp_attribute=pubsub_read_payload.timestamp_attribute)
+ # Required as this is identified by type in PTransformOverrides.
+ # TODO(BEAM-3812): Use an actual URN here.
+ return self.to_runner_api_pickled(context)
@deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.')
@@ -284,13 +260,12 @@ class WriteToPubSub(PTransform):
# Implementation note: This ``PTransform`` is overridden by Directrunner.
- def __init__(
- self,
- topic, # type: str
- with_attributes=False, # type: bool
- id_label=None, # type: Optional[str]
- timestamp_attribute=None # type: Optional[str]
- ):
+ def __init__(self,
+ topic, # type: str
+ with_attributes=False, # type: bool
+ id_label=None, # type: Optional[str]
+ timestamp_attribute=None # type: Optional[str]
+ ):
# type: (...) -> None
"""Initializes ``WriteToPubSub``.
@@ -333,32 +308,10 @@ class WriteToPubSub(PTransform):
pcoll.element_type = bytes
return pcoll | Write(self._sink)
- def _pubsub_write_payload(self):
- return beam_runner_api_pb2.PubSubWritePayload(
- topic=self._sink.full_topic,
- timestamp_attribute=self._sink.timestamp_attribute,
- id_attribute=self._sink.id_label,
- with_attributes=self.with_attributes,
- serialized_attribute_fn='')
-
def to_runner_api_parameter(self, context):
- payload = self._pubsub_write_payload()
- sink_coder = coders.WindowedValueCoder(
- self._sink.coder, coders.coders.GlobalWindowCoder())
- payload.coder_id = context.coders.get_id(sink_coder)
- return (common_urns.composites.PUBSUB_WRITE.urn, payload)
-
- @staticmethod
- @PTransform.register_urn(
- common_urns.composites.PUBSUB_WRITE.urn,
- beam_runner_api_pb2.PubSubWritePayload)
- def from_runner_api_parameter(
- unused_ptransform, pubsub_write_payload, unused_context):
- return WriteToPubSub(
- topic=pubsub_write_payload.topic,
- with_attributes=pubsub_write_payload.with_attributes,
- id_label=pubsub_write_payload.id_attribute,
- timestamp_attribute=pubsub_write_payload.timestamp_attribute)
+ # Required as this is identified by type in PTransformOverrides.
+ # TODO(BEAM-3812): Use an actual URN here.
+ return self.to_runner_api_pickled(context)
PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
@@ -399,14 +352,14 @@ class _PubSubSource(dataflow_io.NativeSource):
with_attributes: If False, will fetch just message data. Otherwise,
fetches ``PubsubMessage`` protobufs.
"""
- def __init__(
- self,
- topic=None, # type: Optional[str]
- subscription=None, # type: Optional[str]
- id_label=None, # type: Optional[str]
- with_attributes=False, # type: bool
- timestamp_attribute=None # type: Optional[str]
- ):
+
+ def __init__(self,
+ topic=None, # type: Optional[str]
+ subscription=None, # type: Optional[str]
+ id_label=None, # type: Optional[str]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None # type: Optional[str]
+ ):
self.coder = coders.BytesCoder()
self.full_topic = topic
self.full_subscription = subscription
@@ -459,13 +412,13 @@ class _PubSubSink(dataflow_io.NativeSink):
This ``NativeSource`` is overridden by a native Pubsub implementation.
"""
- def __init__(
- self,
- topic, # type: str
- id_label, # type: Optional[str]
- with_attributes, # type: bool
- timestamp_attribute # type: Optional[str]
- ):
+
+ def __init__(self,
+ topic, # type: str
+ id_label, # type: Optional[str]
+ with_attributes, # type: bool
+ timestamp_attribute # type: Optional[str]
+ ):
self.coder = coders.BytesCoder()
self.full_topic = topic
self.id_label = id_label
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 9a4fc3e..703b40f 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -41,9 +41,6 @@ from apache_beam.io.gcp.pubsub import _PubSubSink
from apache_beam.io.gcp.pubsub import _PubSubSource
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.runners import pipeline_context
from apache_beam.runners.direct import transform_evaluator
from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
from apache_beam.runners.direct.direct_runner import _get_transform_overrides
@@ -57,7 +54,6 @@ from apache_beam.transforms import window
from apache_beam.transforms.core import Create
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils import proto_utils
from apache_beam.utils import timestamp
# Protect against environments where the PubSub library is not available.
@@ -583,59 +579,6 @@ class TestReadFromPubSub(unittest.TestCase):
p | ReadFromPubSub(
'projects/fakeprj/topics/a_topic', None, 'a_label'))
- def test_runner_api_transformation_with_topic(self, unused_mock_pubsub):
- context = pipeline_context.PipelineContext()
- transform = ReadFromPubSub(
- topic='projects/fakeprj/topics/a_topic',
- subscription=None,
- id_label='a_label',
- timestamp_attribute='b_label',
- with_attributes=True)
- proto_transform = transform.to_runner_api(context)
- self.assertEqual(
- common_urns.composites.PUBSUB_READ.urn, proto_transform.urn)
-
- pubsub_read_payload = (
- proto_utils.parse_Bytes(
- proto_transform.payload, beam_runner_api_pb2.PubSubReadPayload))
- self.assertEqual(
- 'projects/fakeprj/topics/a_topic', pubsub_read_payload.topic)
- self.assertEqual('a_label', pubsub_read_payload.id_attribute)
- self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
- self.assertTrue(pubsub_read_payload.with_attributes)
- self.assertEqual('', pubsub_read_payload.subscription)
-
- transform_from_proto = ReadFromPubSub.from_runner_api_parameter(
- None, pubsub_read_payload, None)
- self.assertTrue(isinstance(transform_from_proto, ReadFromPubSub))
-
- def test_runner_api_tranformation_with_subscription(self, unused_mock_pubsub):
- context = pipeline_context.PipelineContext()
- transform = ReadFromPubSub(
- topic=None,
- subscription='projects/fakeprj/subscriptions/a_subscription',
- id_label='a_label',
- timestamp_attribute='b_label',
- with_attributes=True)
- proto_transform = transform.to_runner_api(context)
- self.assertEqual(
- common_urns.composites.PUBSUB_READ.urn, proto_transform.urn)
-
- pubsub_read_payload = (
- proto_utils.parse_Bytes(
- proto_transform.payload, beam_runner_api_pb2.PubSubReadPayload))
- self.assertEqual(
- 'projects/fakeprj/subscriptions/a_subscription',
- pubsub_read_payload.subscription)
- self.assertEqual('a_label', pubsub_read_payload.id_attribute)
- self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
- self.assertTrue(pubsub_read_payload.with_attributes)
- self.assertEqual('', pubsub_read_payload.topic)
-
- transform_from_proto = ReadFromPubSub.from_runner_api_parameter(
- None, pubsub_read_payload, None)
- self.assertTrue(isinstance(transform_from_proto, ReadFromPubSub))
-
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('google.cloud.pubsub.PublisherClient')
@@ -728,30 +671,6 @@ class TestWriteToPubSub(unittest.TestCase):
'projects/fakeprj/topics/a_topic',
timestamp_attribute='timestamp'))
- def test_runner_api_transformation(self, unused_mock_pubsub):
- context = pipeline_context.PipelineContext()
- transform = WriteToPubSub(
- topic='projects/fakeprj/topics/a_topic',
- id_label='a_label',
- timestamp_attribute='b_label',
- with_attributes=True)
- proto_transform = transform.to_runner_api(context)
- self.assertEqual(
- common_urns.composites.PUBSUB_WRITE.urn, proto_transform.urn)
-
- pubsub_write_payload = (
- proto_utils.parse_Bytes(
- proto_transform.payload, beam_runner_api_pb2.PubSubWritePayload))
- self.assertEqual(
- 'projects/fakeprj/topics/a_topic', pubsub_write_payload.topic)
- self.assertEqual('a_label', pubsub_write_payload.id_attribute)
- self.assertEqual('b_label', pubsub_write_payload.timestamp_attribute)
- self.assertTrue(pubsub_write_payload.with_attributes)
-
- transform_from_proto = WriteToPubSub.from_runner_api_parameter(
- None, pubsub_write_payload, None)
- self.assertTrue(isinstance(transform_from_proto, WriteToPubSub))
-
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)