You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/28 17:04:06 UTC

[GitHub] [beam] yifanmai commented on a change in pull request #12707: Identify pair-with-none DoFn in CombineGlobally with a named URN.

yifanmai commented on a change in pull request #12707:
URL: https://github.com/apache/beam/pull/12707#discussion_r479428934



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1493,6 +1488,72 @@ def expand(self, pcoll):
         pcoll.pipeline, self._do_transform, self._tags, self._main_tag)
 
 
+class DoFnInfo(object):
+  """This class represents the state in the ParDoPayload's function spec,
+  which is the actual DoFn together with some data required for invoking it.
+  """
+  @staticmethod
+  def register_stateless_dofn(urn):
+    def wrapper(cls):
+      StatelessDoFnInfo.REGISTERED_DOFNS[urn] = cls
+      cls._stateless_dofn_urn = urn
+      return cls
+
+    return wrapper
+
+  @classmethod
+  def create(cls, fn, args, kwargs):
+    if hasattr(fn, '_stateless_dofn_urn'):
+      assert not args and not kwargs
+      return StatelessDoFnInfo(fn._stateless_dofn_urn)
+    else:
+      return PickledDoFnInfo(cls._pickled_do_fn_info(fn, args, kwargs))
+
+  @staticmethod
+  def from_runner_api(spec, unused_context):
+    if spec.urn == python_urns.PICKLED_DOFN_INFO:
+      return PickledDoFnInfo(spec.payload)
+    elif spec.urn in StatelessDoFnInfo.REGISTERED_DOFNS:
+      return StatelessDoFnInfo(spec.urn)
+    else:
+      raise ValueError('Unexpected DoFn type: %s' % spec.urn)
+
+  @staticmethod
+  def _pickled_do_fn_info(fn, args, kwargs):
+    # This can be cleaned up once all runners move to portability.
+    return pickler.dumps((fn, args, kwargs, None, None))
+
+  def serialized_data(self):

Review comment:
       optional: should this be named `serialized_fn` or `serialized_dofn_data` to match the naming in `bundle_processor.py`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org