You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/09/23 14:33:44 UTC

[beam] branch revert-12760-add_pubsub_payload created (now 967d728)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a change to branch revert-12760-add_pubsub_payload
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 967d728  Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"

This branch includes the following new commits:

     new 967d728  Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-12760-add_pubsub_payload
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 967d7288cd5069f21617c98a9156dd4076bb9fb9
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Sep 23 07:32:56 2020 -0700

    Revert "[BEAM-10861] Adds URNs and payloads to PubSub transforms to allow runner-native overrides (#12760)"
    
    This reverts commit a19d06eaf5d9639df330915ec175c8c8082c4980.
---
 .../pipeline/src/main/proto/beam_runner_api.proto  |  54 ----------
 sdks/python/apache_beam/io/gcp/pubsub.py           | 115 ++++++---------------
 sdks/python/apache_beam/io/gcp/pubsub_test.py      |  81 ---------------
 3 files changed, 34 insertions(+), 216 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 848522b..0298b3f 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -324,12 +324,6 @@ message StandardPTransforms {
 
     // Less well-known. Payload: WriteFilesPayload.
     WRITE_FILES = 3 [(beam_urn) = "beam:transform:write_files:v1"];
-
-    // Payload: PubSubReadPayload.
-    PUBSUB_READ = 4 [(beam_urn) = "beam:transform:pubsub_read:v1"];
-
-    // Payload: PubSubWritePayload.
-    PUBSUB_WRITE = 5 [(beam_urn) = "beam:transform:pubsub_write:v1"];
   }
   // Payload for all of these: CombinePayload
   enum CombineComponents {
@@ -656,54 +650,6 @@ message WriteFilesPayload {
   map<string, SideInput> side_inputs = 5;
 }
 
-// Payload used by Google Cloud Pub/Sub read transform.
-// This can be used by runners that wish to override Beam Pub/Sub read transform
-// with a native implementation.
-message PubSubReadPayload {
-
-  // Topic to read from. Exactly one of topic or subscription should be set.
-  string topic = 1;
-
-  // Subscription to read from. Exactly one of topic or subscription should be set.
-  string subscription = 2;
-
-  // Attribute that provides element timestamps.
-  string timestamp_attribute = 3;
-
-  // Attribute to be used for uniquely identifying messages.
-  string id_attribute = 4;
-
-  // If true, reads Pub/Sub payload as well as attributes. If false, reads only the payload.
-  bool with_attributes = 5;
-
-  // JSON serialized parse function for attibutes.
-  string serialized_attribute_fn = 6;
-}
-
-// Payload used by Google Cloud Pub/Sub write transform.
-// This can be used by runners that wish to override Beam Pub/Sub write transform
-// with a native implementation.
-message PubSubWritePayload {
-
-  // Topic to write to.
-  string topic = 1;
-
-  // Attribute that provides element timestamps.
-  string timestamp_attribute = 2;
-
-  // Attribute that uniquely identify messages.
-  string id_attribute = 3;
-
-  // If true, writes Pub/Sub payload as well as attributes. If false, reads only the payload.
-  bool with_attributes = 4;
-
-  // JSON serialized parse function for attributes.
-  string serialized_attribute_fn = 5;
-
-  // Coder ID for the Pub/Sub sink (if the runner requires it).
-  string coder_id = 6;
-}
-
 // A coder, the binary format for serialization and deserialization of data in
 // a pipeline.
 message Coder {
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index af5fd9b..b0f8bdf 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -38,8 +38,6 @@ from past.builtins import unicode
 from apache_beam import coders
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
@@ -147,14 +145,13 @@ class ReadFromPubSub(PTransform):
 
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(
-      self,
-      topic=None,  # type: Optional[str]
-      subscription=None,  # type: Optional[str]
-      id_label=None,  # type: Optional[str]
-      with_attributes=False,  # type: bool
-      timestamp_attribute=None  # type: Optional[str]
-  ):
+  def __init__(self,
+               topic=None,  # type: Optional[str]
+               subscription=None,  # type: Optional[str]
+               id_label=None,  # type: Optional[str]
+               with_attributes=False,  # type: bool
+               timestamp_attribute=None  # type: Optional[str]
+              ):
     # type: (...) -> None
 
     """Initializes ``ReadFromPubSub``.
@@ -206,31 +203,10 @@ class ReadFromPubSub(PTransform):
       pcoll.element_type = PubsubMessage
     return pcoll
 
-  def _pubsub_read_payload(self):
-    return beam_runner_api_pb2.PubSubReadPayload(
-        topic=self._source.full_topic,
-        subscription=self._source.full_subscription,
-        timestamp_attribute=self._source.timestamp_attribute,
-        id_attribute=self._source.id_label,
-        with_attributes=self.with_attributes,
-        serialized_attribute_fn='')
-
   def to_runner_api_parameter(self, context):
-    payload = self._pubsub_read_payload()
-    return (common_urns.composites.PUBSUB_READ.urn, payload)
-
-  @staticmethod
-  @PTransform.register_urn(
-      common_urns.composites.PUBSUB_READ.urn,
-      beam_runner_api_pb2.PubSubReadPayload)
-  def from_runner_api_parameter(
-      unused_ptransform, pubsub_read_payload, unused_context):
-    return ReadFromPubSub(
-        topic=pubsub_read_payload.topic,
-        subscription=pubsub_read_payload.subscription,
-        id_label=pubsub_read_payload.id_attribute,
-        with_attributes=pubsub_read_payload.with_attributes,
-        timestamp_attribute=pubsub_read_payload.timestamp_attribute)
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
 
 
 @deprecated(since='2.7.0', extra_message='Use ReadFromPubSub instead.')
@@ -284,13 +260,12 @@ class WriteToPubSub(PTransform):
 
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
-  def __init__(
-      self,
-      topic,  # type: str
-      with_attributes=False,  # type: bool
-      id_label=None,  # type: Optional[str]
-      timestamp_attribute=None  # type: Optional[str]
-  ):
+  def __init__(self,
+               topic,  # type: str
+               with_attributes=False,  # type: bool
+               id_label=None,  # type: Optional[str]
+               timestamp_attribute=None  # type: Optional[str]
+              ):
     # type: (...) -> None
 
     """Initializes ``WriteToPubSub``.
@@ -333,32 +308,10 @@ class WriteToPubSub(PTransform):
     pcoll.element_type = bytes
     return pcoll | Write(self._sink)
 
-  def _pubsub_write_payload(self):
-    return beam_runner_api_pb2.PubSubWritePayload(
-        topic=self._sink.full_topic,
-        timestamp_attribute=self._sink.timestamp_attribute,
-        id_attribute=self._sink.id_label,
-        with_attributes=self.with_attributes,
-        serialized_attribute_fn='')
-
   def to_runner_api_parameter(self, context):
-    payload = self._pubsub_write_payload()
-    sink_coder = coders.WindowedValueCoder(
-        self._sink.coder, coders.coders.GlobalWindowCoder())
-    payload.coder_id = context.coders.get_id(sink_coder)
-    return (common_urns.composites.PUBSUB_WRITE.urn, payload)
-
-  @staticmethod
-  @PTransform.register_urn(
-      common_urns.composites.PUBSUB_WRITE.urn,
-      beam_runner_api_pb2.PubSubWritePayload)
-  def from_runner_api_parameter(
-      unused_ptransform, pubsub_write_payload, unused_context):
-    return WriteToPubSub(
-        topic=pubsub_write_payload.topic,
-        with_attributes=pubsub_write_payload.with_attributes,
-        id_label=pubsub_write_payload.id_attribute,
-        timestamp_attribute=pubsub_write_payload.timestamp_attribute)
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
 
 
 PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
@@ -399,14 +352,14 @@ class _PubSubSource(dataflow_io.NativeSource):
     with_attributes: If False, will fetch just message data. Otherwise,
       fetches ``PubsubMessage`` protobufs.
   """
-  def __init__(
-      self,
-      topic=None,  # type: Optional[str]
-      subscription=None,  # type: Optional[str]
-      id_label=None,  # type: Optional[str]
-      with_attributes=False,  # type: bool
-      timestamp_attribute=None  # type: Optional[str]
-  ):
+
+  def __init__(self,
+               topic=None,  # type: Optional[str]
+               subscription=None,  # type: Optional[str]
+               id_label=None,  # type: Optional[str]
+               with_attributes=False,  # type: bool
+               timestamp_attribute=None  # type: Optional[str]
+              ):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.full_subscription = subscription
@@ -459,13 +412,13 @@ class _PubSubSink(dataflow_io.NativeSink):
 
   This ``NativeSource`` is overridden by a native Pubsub implementation.
   """
-  def __init__(
-      self,
-      topic,  # type: str
-      id_label,  # type: Optional[str]
-      with_attributes,  # type: bool
-      timestamp_attribute  # type: Optional[str]
-  ):
+
+  def __init__(self,
+               topic,  # type: str
+               id_label,  # type: Optional[str]
+               with_attributes,  # type: bool
+               timestamp_attribute  # type: Optional[str]
+              ):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.id_label = id_label
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 9a4fc3e..703b40f 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -41,9 +41,6 @@ from apache_beam.io.gcp.pubsub import _PubSubSink
 from apache_beam.io.gcp.pubsub import _PubSubSource
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.runners import pipeline_context
 from apache_beam.runners.direct import transform_evaluator
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
 from apache_beam.runners.direct.direct_runner import _get_transform_overrides
@@ -57,7 +54,6 @@ from apache_beam.transforms import window
 from apache_beam.transforms.core import Create
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils import proto_utils
 from apache_beam.utils import timestamp
 
 # Protect against environments where the PubSub library is not available.
@@ -583,59 +579,6 @@ class TestReadFromPubSub(unittest.TestCase):
             p | ReadFromPubSub(
                 'projects/fakeprj/topics/a_topic', None, 'a_label'))
 
-  def test_runner_api_transformation_with_topic(self, unused_mock_pubsub):
-    context = pipeline_context.PipelineContext()
-    transform = ReadFromPubSub(
-        topic='projects/fakeprj/topics/a_topic',
-        subscription=None,
-        id_label='a_label',
-        timestamp_attribute='b_label',
-        with_attributes=True)
-    proto_transform = transform.to_runner_api(context)
-    self.assertEqual(
-        common_urns.composites.PUBSUB_READ.urn, proto_transform.urn)
-
-    pubsub_read_payload = (
-        proto_utils.parse_Bytes(
-            proto_transform.payload, beam_runner_api_pb2.PubSubReadPayload))
-    self.assertEqual(
-        'projects/fakeprj/topics/a_topic', pubsub_read_payload.topic)
-    self.assertEqual('a_label', pubsub_read_payload.id_attribute)
-    self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
-    self.assertTrue(pubsub_read_payload.with_attributes)
-    self.assertEqual('', pubsub_read_payload.subscription)
-
-    transform_from_proto = ReadFromPubSub.from_runner_api_parameter(
-        None, pubsub_read_payload, None)
-    self.assertTrue(isinstance(transform_from_proto, ReadFromPubSub))
-
-  def test_runner_api_tranformation_with_subscription(self, unused_mock_pubsub):
-    context = pipeline_context.PipelineContext()
-    transform = ReadFromPubSub(
-        topic=None,
-        subscription='projects/fakeprj/subscriptions/a_subscription',
-        id_label='a_label',
-        timestamp_attribute='b_label',
-        with_attributes=True)
-    proto_transform = transform.to_runner_api(context)
-    self.assertEqual(
-        common_urns.composites.PUBSUB_READ.urn, proto_transform.urn)
-
-    pubsub_read_payload = (
-        proto_utils.parse_Bytes(
-            proto_transform.payload, beam_runner_api_pb2.PubSubReadPayload))
-    self.assertEqual(
-        'projects/fakeprj/subscriptions/a_subscription',
-        pubsub_read_payload.subscription)
-    self.assertEqual('a_label', pubsub_read_payload.id_attribute)
-    self.assertEqual('b_label', pubsub_read_payload.timestamp_attribute)
-    self.assertTrue(pubsub_read_payload.with_attributes)
-    self.assertEqual('', pubsub_read_payload.topic)
-
-    transform_from_proto = ReadFromPubSub.from_runner_api_parameter(
-        None, pubsub_read_payload, None)
-    self.assertTrue(isinstance(transform_from_proto, ReadFromPubSub))
-
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 @mock.patch('google.cloud.pubsub.PublisherClient')
@@ -728,30 +671,6 @@ class TestWriteToPubSub(unittest.TestCase):
                 'projects/fakeprj/topics/a_topic',
                 timestamp_attribute='timestamp'))
 
-  def test_runner_api_transformation(self, unused_mock_pubsub):
-    context = pipeline_context.PipelineContext()
-    transform = WriteToPubSub(
-        topic='projects/fakeprj/topics/a_topic',
-        id_label='a_label',
-        timestamp_attribute='b_label',
-        with_attributes=True)
-    proto_transform = transform.to_runner_api(context)
-    self.assertEqual(
-        common_urns.composites.PUBSUB_WRITE.urn, proto_transform.urn)
-
-    pubsub_write_payload = (
-        proto_utils.parse_Bytes(
-            proto_transform.payload, beam_runner_api_pb2.PubSubWritePayload))
-    self.assertEqual(
-        'projects/fakeprj/topics/a_topic', pubsub_write_payload.topic)
-    self.assertEqual('a_label', pubsub_write_payload.id_attribute)
-    self.assertEqual('b_label', pubsub_write_payload.timestamp_attribute)
-    self.assertTrue(pubsub_write_payload.with_attributes)
-
-    transform_from_proto = WriteToPubSub.from_runner_api_parameter(
-        None, pubsub_write_payload, None)
-    self.assertTrue(isinstance(transform_from_proto, WriteToPubSub))
-
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)