You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/11/01 19:26:00 UTC

[jira] [Work logged] (BEAM-13052) Pub/Sub Lite support for Python SDK

     [ https://issues.apache.org/jira/browse/BEAM-13052?focusedWorklogId=672805&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-672805 ]

ASF GitHub Bot logged work on BEAM-13052:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/21 19:25
            Start Date: 01/Nov/21 19:25
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request #15817:
URL: https://github.com/apache/beam/pull/15817#discussion_r740467911



##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -310,6 +311,19 @@ def encode(self, value):
     return value.SerializePartialToString(deterministic=True)
 
 
+class ProtoPlusCoderImpl(SimpleCoderImpl):

Review comment:
       Do you think this could affect the performance characteristics of existing Python GCP connectors ?

##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -1014,6 +1016,42 @@ def as_deterministic_coder(self, step_label, error_message=None):
     return self
 
 
+class ProtoPlusCoder(FastCoder):
+  """A Coder for Google Protocol Buffers wrapped using the proto-plus library.
+
+  ProtoPlusCoder is registered in the global CoderRegistry as the default coder
+  for any proto.Message object.
+  """
+  def __init__(self, proto_plus_message_type):
+    # type: (Type[proto.Message]) -> None
+    self.proto_plus_message_type = proto_plus_message_type
+
+  def _create_impl(self):
+    return coder_impl.ProtoPlusCoderImpl(self.proto_plus_message_type)
+
+  def is_deterministic(self):
+    return True

Review comment:
       Is this deterministic ( looks like ProtoCoder is not deterministic) ?

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -310,6 +311,19 @@ def encode(self, value):
     return value.SerializePartialToString(deterministic=True)
 
 
+class ProtoPlusCoderImpl(SimpleCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
+  def __init__(self, proto_plus_type):
+    # type: (Type[proto.Message]) -> None
+    self.proto_plus_type = proto_plus_type
+
+  def encode(self, value):
+    return value._pb.SerializePartialToString(deterministic=True)
+
+  def decode(self, value):
+    return self.proto_plus_type.deserialize(value)

Review comment:
       To confirm, the API to be used here is different from ProtoCoder above (which uses 'ParseFromString') ?

##########
File path: sdks/python/apache_beam/coders/coders_test.py
##########
@@ -109,6 +111,45 @@ def test_deterministic_proto_coder_determinism(self):
       self.assertEqual(coder.encode(mm_forward), coder.encode(mm_reverse))
 
 
+class ProtoPlusMessageB(proto.Message):
+  field1 = proto.Field(proto.BOOL, number=1)
+
+
+class ProtoPlusMessageA(proto.Message):
+  field1 = proto.Field(proto.STRING, number=1)
+  field2 = proto.RepeatedField(ProtoPlusMessageB, number=2)
+
+
+class ProtoPlusMessageWithMap(proto.Message):
+  field1 = proto.MapField(proto.STRING, ProtoPlusMessageA, number=1)
+
+
+class ProtoPlusCoderTest(unittest.TestCase):
+  def test_proto_plus_coder(self):

Review comment:
       Should we also add tests that include more complex field types (for example, maps) ?

##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -1014,6 +1016,42 @@ def as_deterministic_coder(self, step_label, error_message=None):
     return self
 
 
+class ProtoPlusCoder(FastCoder):
+  """A Coder for Google Protocol Buffers wrapped using the proto-plus library.
+
+  ProtoPlusCoder is registered in the global CoderRegistry as the default coder
+  for any proto.Message object.
+  """
+  def __init__(self, proto_plus_message_type):
+    # type: (Type[proto.Message]) -> None
+    self.proto_plus_message_type = proto_plus_message_type
+
+  def _create_impl(self):
+    return coder_impl.ProtoPlusCoderImpl(self.proto_plus_message_type)
+
+  def is_deterministic(self):
+    return True
+
+  def __eq__(self, other):
+    return (
+        type(self) == type(other) and
+        self.proto_plus_message_type == other.proto_plus_message_type)
+
+  def __hash__(self):
+    return hash(self.proto_plus_message_type)
+
+  @classmethod
+  def from_type_hint(cls, typehint, unused_registry):
+    if issubclass(typehint, proto.Message):

Review comment:
       To clarify, currently such messages get encoded using the default coder ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 672805)
    Time Spent: 2h 50m  (was: 2h 40m)

> Pub/Sub Lite support for Python SDK
> -----------------------------------
>
>                 Key: BEAM-13052
>                 URL: https://issues.apache.org/jira/browse/BEAM-13052
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-py-gcp
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Daniel Collins
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Potentially we can make existing Java source/sink available to Python as cross-language transforms.
>  
> cc: [~dpcollins-google]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)