You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/08/27 01:30:44 UTC

[beam] branch master updated: Fix PairWithRestrictionFn process no attribute signature error

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b3b14c  Fix PairWithRestrictionFn process no attribute signature error
     new 7973f41  Merge pull request #12640 from y1chi/bundle_runner_sdf
8b3b14c is described below

commit 8b3b14cd8d28302be9c9bf9d0280ac4419ea4743
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Aug 19 16:07:59 2020 -0700

    Fix PairWithRestrictionFn process no attribute signature error
---
 sdks/python/apache_beam/runners/direct/sdf_direct_runner.py | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index ac27d57..7d605b2 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -117,19 +117,18 @@ class ElementAndRestriction(object):
 class PairWithRestrictionFn(beam.DoFn):
   """A transform that pairs each element with a restriction."""
   def __init__(self, do_fn):
-    self._do_fn = do_fn
+    self._signature = DoFnSignature(do_fn)
 
   def start_bundle(self):
-    signature = DoFnSignature(self._do_fn)
     self._invoker = DoFnInvoker.create_invoker(
-        signature,
+        self._signature,
         output_processor=_NoneShallPassOutputProcessor(),
         process_invocation=False)
 
   def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
     initial_restriction = self._invoker.invoke_initial_restriction(element)
     watermark_estimator_state = (
-        self.signature.process_method.watermark_estimator_provider.
+        self._signature.process_method.watermark_estimator_provider.
         initial_estimator_state(element, initial_restriction))
     yield ElementAndRestriction(
         element, initial_restriction, watermark_estimator_state)