You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/18 16:53:56 UTC
[2/2] incubator-beam git commit: Start/finish bundle methods do not
take extra args anymore
Start/finish bundle methods do not take extra args anymore
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/39f58c74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/39f58c74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/39f58c74
Branch: refs/heads/python-sdk
Commit: 39f58c746c380c5759558041c56090a57d88944a
Parents: 36456c6
Author: Silviu Calinoiu <si...@google.com>
Authored: Fri Jul 15 13:52:44 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jul 18 09:53:52 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 4 ++--
sdks/python/apache_beam/transforms/core.py | 22 ++++++++++-----------
sdks/python/apache_beam/typehints/typecheck.py | 16 +++++++--------
3 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/39f58c74/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index e33d4ce..3c0c3f6 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -62,13 +62,13 @@ class DoFnRunner(object):
class CurriedFn(core.DoFn):
def start_bundle(self, context):
- return fn.start_bundle(context, *args, **kwargs)
+ return fn.start_bundle(context)
def process(self, context):
return fn.process(context, *args, **kwargs)
def finish_bundle(self, context):
- return fn.finish_bundle(context, *args, **kwargs)
+ return fn.finish_bundle(context)
self.dofn = CurriedFn()
self.window_fn = windowing.windowfn
self.context = context
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/39f58c74/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 177a5cf..b288321 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -51,8 +51,13 @@ K = typehints.TypeVariable('K')
V = typehints.TypeVariable('V')
-class DoFnProcessContext(object):
- """A processing context passed to DoFn methods during execution.
+class DoFnContext(object):
+ """A context available to all methods of DoFn instance."""
+ pass
+
+
+class DoFnProcessContext(DoFnContext):
+ """A processing context passed to DoFn process() during execution.
Most importantly, a DoFn.process method will access context.element
to get the element it is supposed to process.
@@ -132,7 +137,7 @@ class DoFn(WithTypeHints):
return self._strip_output_annotations(
trivial_inference.infer_return_type(self.process, [input_type]))
- def start_bundle(self, context, *args, **kwargs):
+ def start_bundle(self, context):
"""Called before a bundle of elements is processed on a worker.
Elements to be processed are split into bundles and distributed
@@ -140,20 +145,15 @@ class DoFn(WithTypeHints):
of its bundle, it calls this method.
Args:
- context: a DoFnProcessContext object
- *args: side inputs
- **kwargs: keyword side inputs
-
+ context: a DoFnContext object
"""
pass
- def finish_bundle(self, context, *args, **kwargs):
+ def finish_bundle(self, context):
"""Called after a bundle of elements is processed on a worker.
Args:
- context: a DoFnProcessContext object
- *args: side inputs
- **kwargs: keyword side inputs
+ context: a DoFnContext object
"""
pass
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/39f58c74/sdks/python/apache_beam/typehints/typecheck.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py
index cd0a8f2..d759d55 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -54,13 +54,13 @@ class TypeCheckWrapperDoFn(DoFn):
# TODO(robertwb): Multi-output.
self._output_type_hint = type_hints.simple_output_type(label)
- def start_bundle(self, context, *args, **kwargs):
+ def start_bundle(self, context):
return self._type_check_result(
- self._dofn.start_bundle(context, *args, **kwargs))
+ self._dofn.start_bundle(context))
- def finish_bundle(self, context, *args, **kwargs):
+ def finish_bundle(self, context):
return self._type_check_result(
- self._dofn.finish_bundle(context, *args, **kwargs))
+ self._dofn.finish_bundle(context))
def process(self, context, *args, **kwargs):
if self._input_hints:
@@ -139,11 +139,11 @@ class OutputCheckWrapperDoFn(DoFn):
else:
return self._check_type(result)
- def start_bundle(self, context, *args, **kwargs):
- return self.run(self.dofn.start_bundle, context, args, kwargs)
+ def start_bundle(self, context):
+ return self.run(self.dofn.start_bundle, context, [], {})
- def finish_bundle(self, context, *args, **kwargs):
- return self.run(self.dofn.finish_bundle, context, args, kwargs)
+ def finish_bundle(self, context):
+ return self.run(self.dofn.finish_bundle, context, [], {})
def process(self, context, *args, **kwargs):
return self.run(self.dofn.process, context, args, kwargs)