You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/08/27 01:30:44 UTC
[beam] branch master updated: Fix PairWithRestrictionFn process no
attribute signature error
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8b3b14c Fix PairWithRestrictionFn process no attribute signature error
new 7973f41 Merge pull request #12640 from y1chi/bundle_runner_sdf
8b3b14c is described below
commit 8b3b14cd8d28302be9c9bf9d0280ac4419ea4743
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Wed Aug 19 16:07:59 2020 -0700
Fix PairWithRestrictionFn process no attribute signature error
---
sdks/python/apache_beam/runners/direct/sdf_direct_runner.py | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index ac27d57..7d605b2 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -117,19 +117,18 @@ class ElementAndRestriction(object):
class PairWithRestrictionFn(beam.DoFn):
"""A transform that pairs each element with a restriction."""
def __init__(self, do_fn):
- self._do_fn = do_fn
+ self._signature = DoFnSignature(do_fn)
def start_bundle(self):
- signature = DoFnSignature(self._do_fn)
self._invoker = DoFnInvoker.create_invoker(
- signature,
+ self._signature,
output_processor=_NoneShallPassOutputProcessor(),
process_invocation=False)
def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs):
initial_restriction = self._invoker.invoke_initial_restriction(element)
watermark_estimator_state = (
- self.signature.process_method.watermark_estimator_provider.
+ self._signature.process_method.watermark_estimator_provider.
initial_estimator_state(element, initial_restriction))
yield ElementAndRestriction(
element, initial_restriction, watermark_estimator_state)