You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:00:17 UTC

[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r516397331



##########
File path: sdks/python/apache_beam/transforms/userstate.py
##########
@@ -357,6 +357,10 @@ def prefetch(self):
     # The default implementation here does nothing.
     pass
 
+  def finalize(self):

Review comment:
       @robertwb do you see any concerns with adding a top level `finalize` method here? 

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -411,6 +411,33 @@ def visit_transform(self, transform_node):
 
     return FlattenInputVisitor()
 
+  @staticmethod
+  def combinefn_visitor():
+    # Imported here to avoid circular dependencies.
+    from apache_beam.pipeline import PipelineVisitor
+    from apache_beam import core
+
+    class CombineFnVisitor(PipelineVisitor):
+      """Checks if `CombineFn` has non-default setup or teardown methods.
+      If yes, raises `ValueError`.
+      """
+      def visit_transform(self, applied_transform):
+        transform = applied_transform.transform
+        if isinstance(transform, core.ParDo) and isinstance(
+            transform.fn, core.CombineValuesDoFn):
+          if self._overrides_setup_or_teardown(transform.fn.combinefn):
+            raise ValueError(
+                'CombineFn.setup and CombineFn.teardown are '
+                'not supported with non-portable Dataflow '
+                'runner. Please use Dataflow Runner V2 instead.')
+
+      @staticmethod
+      def _overrides_setup_or_teardown(combinefn):
+        return combinefn.__class__.setup is not core.CombineFn.setup or \

Review comment:
       nit: prefer to use `()` instead of `\` .

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1975,10 +1990,13 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.deepcopy(self.fn) if isinstance(self.fn, CombineFn) \

Review comment:
       nit: [prefer](https://www.python.org/dev/peps/pep-0008/#:~:text=The%20preferred%20way%20of%20wrapping,a%20backslash%20for%20line%20continuation.) using () to line continuation token.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org