You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/09/23 14:33:45 UTC

[beam] 01/01: Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-12760-add_pubsub_payload
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 967d7288cd5069f21617c98a9156dd4076bb9fb9
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Sep 23 07:32:56 2020 -0700

    Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"
    
    This reverts commit a19d06eaf5d9639df330915ec175c8c8082c4980.
---
 .../pipeline/src/main/proto/beam_runner_api.proto  |  54 ----------
 sdks/python/apache_beam/io/gcp/pubsub.py           | 115 ++++++---------------
 sdks/python/apache_beam/io/gcp/pubsub_test.py      |  81 ---------------
 3 files changed, 34 insertions(+), 216 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 848522b..0298b3f 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -324,12 +324,6 @@ message StandardPTransforms {
 
     // Less well-known. Payload: WriteFilesPayload.
     WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"];
-
-    // Payload: PubSubReadPayload.
-    PUBSUB_READ = 4 [(beam_urn) = "beam:transform:pubsub_read:v1"];
-
-    // Payload: PubSubWritePayload.
-    PUBSUB_WRITE = 5 [(beam_urn) = "beam:transform:pubsub_write:v1"];
   }
   // Payload for all of these: CombinePayload
   enum CombineComponents {
@@ -656,54 +650,6 @@ message WriteFilesPayload {
   map<string, SideInput> side_inputs = 5;
 }
 
-// Payload used by Google Cloud Pub/Sub read transform.
-// This can be used by runners that wish to override Beam Pub/Sub read transform
-// with a native implementation.
-message PubSubReadPayload {
-
-  // Topic to read from. Exactly one of topic or subscription should be set.
-  string topic = 1;
-
-  // Subscription to read from. Exactly one of topic or subscription should be set.
-  string subscription = 2;
-
-  // Attribute that provides element timestamps.
-  string timestamp_attribute = 3;
-
-  // Attribute to be used for uniquely identifying messages.
-  string id_attribute = 4;
-
-  // If true, reads Pub/Sub payload as well as attributes. If false, reads only the payload.
-  bool with_attributes = 5;
-
-  // JSON serialized parse function for attibutes.
-  string serialized_attribute_fn = 6;
-}
-
-// Payload used by Google Cloud Pub/Sub write transform.
-// This can be used by runners that wish to override Beam Pub/Sub write transform
-// with a native implementation.
-message PubSubWritePayload {
-
-  // Topic to write to.
-  string topic = 1;
-
-  // Attribute that provides element timestamps.
-  string timestamp_attribute = 2;
-
-  // Attribute that uniquely identify messages.
-  string id_attribute = 3;
-
-  // If true, writes Pub/Sub payload as well as attributes. If false, reads only the payload.
-  bool with_attributes = 4;
-
-  // JSON serialized parse function for attributes.
-  string serialized_attribute_fn = 5;
-
-  // Coder ID for the Pub/Sub sink (if the runner requires it).
-  string coder_id = 6;
-}
-
 // A coder, the binary format for serialization and deserialization of data in
 // a pipeline.
 message Coder {
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index af5fd9b..b0f8bdf 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,8 +38,6 @@ from past.builtins import unicode
 from apache_beam import coders
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
@@ -147,14 +145,13 @@ class ReadFromPubSub(PTransform):
 
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(
-      self,
-      topic=None,  # type: Optional[str]
-      subscription=None,  # type: Optional[str]
-      id_label=None,  # type: Optional[str]
-      with_attributes=False,  # type: bool
-      timestamp_attribute=None  # type: Optional[str]
-  ):
+  def __init__(self,
+               topic=None,  # type: Optional[str]
+               subscription=None,  # type: Optional[str]
+               id_label=None,  # type: Optional[str]
+               with_attributes=False,  # type: bool
+               timestamp_attribute=None  # type: Optional[str]
+              ):
     # type: (...) -> None
 
     """Initializes ``ReadFromPubSub``.
@@ -206,31 +203,10 @@ class ReadFromPubSub(PTransform):
       pcoll.element_type = PubsubMessage
     return pcoll
 
-  def _pubsub_read_payload(self):
-    return beam_runner_api_pb2.PubSubReadPayload(
-        topic=self._source.full_topic,
-        subscription=self._source.full_subscription,
-        timestamp_attribute=self._source.timestamp_attribute,
-        id_attribute=self._source.id_label,
-        with_attributes=self.with_attributes,
-        serialized_attribute_fn='')
-
   def to_runner_api_parameter(self, context):
-    payload = self._pubsub_read_payload()
-    return (common_urns.composites.PUBSUB_READ.urn, payload)
-
-  @staticmethod
-  @PTransform.register_urn(
-      common_urns.composites.PUBSUB_READ.urn,
-      beam_runner_api_pb2.PubSubReadPayload)
-  def from_runner_api_parameter(
-      unused_ptransform, pubsub_read_payload, unused_context):
-    return ReadFromPubSub(
-        topic=pubsub_read_payload.topic,
-        subscription=pubsub_read_payload.subscription,
-        id_label=pubsub_read_payload.id_attribute,
-        with_attributes=pubsub_read_payload.with_attributes,
-        timestamp_attribute=pubsub_read_payload.timestamp_attribute)
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
 
 
 @deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.')
@@ -284,13 +260,12 @@ class WriteToPubSub(PTransform):
 
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(
-      self,
-      topic,  # type: str
-      with_attributes=False,  # type: bool
-      id_label=None,  # type: Optional[str]
-      timestamp_attribute=None  # type: Optional[str]
-  ):
+  def __init__(self,
+               topic,  # type: str
+               with_attributes=False,  # type: bool
+               id_label=None,  # type: Optional[str]
+               timestamp_attribute=None  # type: Optional[str]
+              ):
     # type: (...) -> None
 
     """Initializes ``WriteToPubSub``.
@@ -333,32 +308,10 @@ class WriteToPubSub(PTransform):
     pcoll.element_type = bytes
     return pcoll | Write(self._sink)
 
-  def _pubsub_write_payload(self):
-    return beam_runner_api_pb2.PubSubWritePayload(
-        topic=self._sink.full_topic,
-        timestamp_attribute=self._sink.timestamp_attribute,
-        id_attribute=self._sink.id_label,
-        with_attributes=self.with_attributes,
-        serialized_attribute_fn='')
-
   def to_runner_api_parameter(self, context):
-    payload = self._pubsub_write_payload()
-    sink_coder = coders.WindowedValueCoder(
-        self._sink.coder, coders.coders.GlobalWindowCoder())
-    payload.coder_id = context.coders.get_id(sink_coder)
-    return (common_urns.composites.PUBSUB_WRITE.urn, payload)
-
-  @staticmethod
-  @PTransform.register_urn(
-      common_urns.composites.PUBSUB_WRITE.urn,
-      beam_runner_api_pb2.PubSubWritePayload)
-  def from_runner_api_parameter(
-      unused_ptransform, pubsub_write_payload, unused_context):
-    return WriteToPubSub(
-        topic=pubsub_write_payload.topic,
-        with_attributes=pubsub_write_payload.with_attributes,
-        id_label=pubsub_write_payload.id_attribute,
-        timestamp_attribute=pubsub_write_payload.timestamp_attribute)
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
 
 
 PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
@@ -399,14 +352,14 @@ class _PubSubSource(dataflow_io.NativeSource):
     with_attributes: If False, will fetch just message data. Otherwise,
       fetches ``PubsubMessage`` protobufs.
   """
-  def __init__(
-      self,
-      topic=None,  # type: Optional[str]
-      subscription=None,  # type: Optional[str]
-      id_label=None,  # type: Optional[str]
-      with_attributes=False,  # type: bool
-      timestamp_attribute=None  # type: Optional[str]
-  ):
+
+  def __init__(self,
+               topic=None,  # type: Optional[str]
+               subscription=None,  # type: Optional[str]
+               id_label=None,  # type: Optional[str]
+               with_attributes=False,  # type: bool
+               timestamp_attribute=None  # type: Optional[str]
+              ):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.full_subscription = subscription
@@ -459,13 +412,13 @@ class _PubSubSink(dataflow_io.NativeSink):
 
   This ``NativeSource`` is overridden by a native Pubsub implementation.
   """
-  def __init__(
-      self,
-      topic,  # type: str
-      id_label,  # type: Optional[str]
-      with_attributes,  # type: bool
-      timestamp_attribute  # type: Optional[str]
-  ):
+
+  def __init__(self,
+               topic,  # type: str
+               id_label,  # type: Optional[str]
+               with_attributes,  # type: bool
+               timestamp_attribute  # type: Optional[str]
+              ):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.id_label = id_label
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 9a4fc3e..703b40f 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -41,9 +41,6 @@ from apache_beam.io.gcp.pubsub import _PubSubSink
 from apache_beam.io.gcp.pubsub import _PubSubSource
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.runners import pipeline_context
 from apache_beam.runners.direct import transform_evaluator
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _get_transform_overrides
@@ -57,7 +54,6 @@ from apache_beam.transforms import window
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils import proto_utils
 from apache_beam.utils import timestamp
 
 # Protect against environments where the PubSub library is not available.
@@ -583,59 +579,6 @@ class TestReadFromPubSub(unittest.TestCase):
             p | ReadFromPubSub(
                 'projects/fakeprj/topics/a_topic', None, 'a_label'))
 
-  def test_runner_api_transformation_with_topic(self, unused_mock_pubsub):
-    context = pipeline_context.PipelineContext()
-    transform = ReadFromPubSub(
-        topic='projects/fakeprj/topics/a_topic',
-        subscription=None,
-        id_label='a_label',
-        timestamp_attribute='b_label',
-        with_attributes=True)
-    proto_transform = transform.to_runner_api(context)
-    self.assertEqual(
-        common_urns.composites.PUBSUB_READ.urn, proto_transform.urn)
-
-    pubsub_read_payload = (
-        proto_utils.parse_Bytes(
-            proto_transform.payload, beam_runner_api_pb2.PubSubReadPayload))
-    self.assertEqual(
-        'projects/fakeprj/topics/a_topic', pubsub_read_payload.topic)
-    self.assertEqual('a_label', pubsub_read_payload.id_attribute)
-    self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
-    self.assertTrue(pubsub_read_payload.with_attributes)
-    self.assertEqual('', pubsub_read_payload.subscription)
-
-    transform_from_proto = ReadFromPubSub.from_runner_api_parameter(
-        None, pubsub_read_payload, None)
-    self.assertTrue(isinstance(transform_from_proto, ReadFromPubSub))
-
-  def test_runner_api_tranformation_with_subscription(self, unused_mock_pubsub):
-    context = pipeline_context.PipelineContext()
-    transform = ReadFromPubSub(
-        topic=None,
-        subscription='projects/fakeprj/subscriptions/a_subscription',
-        id_label='a_label',
-        timestamp_attribute='b_label',
-        with_attributes=True)
-    proto_transform = transform.to_runner_api(context)
-    self.assertEqual(
-        common_urns.composites.PUBSUB_READ.urn, proto_transform.urn)
-
-    pubsub_read_payload = (
-        proto_utils.parse_Bytes(
-            proto_transform.payload, beam_runner_api_pb2.PubSubReadPayload))
-    self.assertEqual(
-        'projects/fakeprj/subscriptions/a_subscription',
-        pubsub_read_payload.subscription)
-    self.assertEqual('a_label', pubsub_read_payload.id_attribute)
-    self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
-    self.assertTrue(pubsub_read_payload.with_attributes)
-    self.assertEqual('', pubsub_read_payload.topic)
-
-    transform_from_proto = ReadFromPubSub.from_runner_api_parameter(
-        None, pubsub_read_payload, None)
-    self.assertTrue(isinstance(transform_from_proto, ReadFromPubSub))
-
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 @mock.patch('google.cloud.pubsub.PublisherClient')
@@ -728,30 +671,6 @@ class TestWriteToPubSub(unittest.TestCase):
                 'projects/fakeprj/topics/a_topic',
                 timestamp_attribute='timestamp'))
 
-  def test_runner_api_transformation(self, unused_mock_pubsub):
-    context = pipeline_context.PipelineContext()
-    transform = WriteToPubSub(
-        topic='projects/fakeprj/topics/a_topic',
-        id_label='a_label',
-        timestamp_attribute='b_label',
-        with_attributes=True)
-    proto_transform = transform.to_runner_api(context)
-    self.assertEqual(
-        common_urns.composites.PUBSUB_WRITE.urn, proto_transform.urn)
-
-    pubsub_write_payload = (
-        proto_utils.parse_Bytes(
-            proto_transform.payload, beam_runner_api_pb2.PubSubWritePayload))
-    self.assertEqual(
-        'projects/fakeprj/topics/a_topic', pubsub_write_payload.topic)
-    self.assertEqual('a_label', pubsub_write_payload.id_attribute)
-    self.assertEqual('b_label', pubsub_write_payload.timestamp_attribute)
-    self.assertTrue(pubsub_write_payload.with_attributes)
-
-    transform_from_proto = WriteToPubSub.from_runner_api_parameter(
-        None, pubsub_write_payload, None)
-    self.assertTrue(isinstance(transform_from_proto, WriteToPubSub))
-
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)