You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/28 18:10:04 UTC
[1/2] incubator-beam git commit: Optimize Map and Flatmap when there
are no side inputs.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk b4716d9dc -> 351c3831d
Optimize Map and Flatmap when there are no side inputs.
varargs and kwargs are expensive, even when they're empty.
This is especially true for otherwise one-argument Python calls
which are special cased in CPython.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d2fb1f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d2fb1f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d2fb1f8
Branch: refs/heads/python-sdk
Commit: 7d2fb1f88d1a2370dd4053f3a1738cbb9838cc2f
Parents: b4716d9
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 27 18:29:59 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:09:48 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 40 ++++++++++++++++++-------
1 file changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d2fb1f8/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 5e6aafc..38b9cd2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -20,6 +20,8 @@
from __future__ import absolute_import
import copy
+import inspect
+import types
from apache_beam import pvalue
from apache_beam import typehints
@@ -194,6 +196,16 @@ class DoFn(WithTypeHints):
return type_hint
+def _fn_takes_side_inputs(fn):
+ try:
+ argspec = inspect.getargspec(fn)
+ except TypeError:
+ # We can't tell; maybe it does.
+ return True
+ is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None
+ return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords
+
+
class CallableWrapperDoFn(DoFn):
"""A DoFn (function) object wrapping a callable object.
@@ -214,6 +226,11 @@ class CallableWrapperDoFn(DoFn):
raise TypeError('Expected a callable object instead of: %r' % fn)
self._fn = fn
+ if _fn_takes_side_inputs(fn):
+ self.process = lambda context, *args, **kwargs: fn(
+ context.element, *args, **kwargs)
+ else:
+ self.process = lambda context: fn(context.element)
super(CallableWrapperDoFn, self).__init__()
@@ -237,9 +254,6 @@ class CallableWrapperDoFn(DoFn):
return self._strip_output_annotations(
trivial_inference.infer_return_type(self._fn, [input_type]))
- def process(self, context, *args, **kwargs):
- return self._fn(context.element, *args, **kwargs)
-
def process_argspec_fn(self):
return getattr(self._fn, '_argspec_fn', self._fn)
@@ -676,7 +690,10 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name
'Map can be used only with callable objects. '
'Received %r instead for %s argument.'
% (fn, 'first' if label is None else 'second'))
- wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+ if _fn_takes_side_inputs(fn):
+ wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+ else:
+ wrapper = lambda x: [fn(x)]
# Proxy the type-hint information from the original function to this new
# wrapped function.
@@ -1008,21 +1025,24 @@ class GroupByKey(PTransform):
value_type = windowed_value_iter_type.inner_type.inner_type
return Iterable[KV[key_type, Iterable[value_type]]]
- def process(self, context):
- k, vs = context.element
+ def start_bundle(self, context):
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.transforms.trigger import InMemoryUnmergedState
from apache_beam.transforms.trigger import create_trigger_driver
# pylint: enable=wrong-import-order, wrong-import-position
- driver = create_trigger_driver(self.windowing, True)
- state = InMemoryUnmergedState()
+ self.driver = create_trigger_driver(self.windowing, True)
+ self.state_type = InMemoryUnmergedState
+
+ def process(self, context):
+ k, vs = context.element
+ state = self.state_type()
# TODO(robertwb): Conditionally process in smaller chunks.
- for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
+ for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
yield wvalue.with_value((k, wvalue.value))
while state.timers:
fired = state.get_and_clear_timers()
for timer_window, (name, time_domain, fire_time) in fired:
- for wvalue in driver.process_timer(
+ for wvalue in self.driver.process_timer(
timer_window, name, time_domain, fire_time, state):
yield wvalue.with_value((k, wvalue.value))
[2/2] incubator-beam git commit: Closes #748
Posted by ro...@apache.org.
Closes #748
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/351c3831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/351c3831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/351c3831
Branch: refs/heads/python-sdk
Commit: 351c3831de1bdcfcb19b2f24f9f0b6a19e77e421
Parents: b4716d9 7d2fb1f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jul 28 11:09:49 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:09:49 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/core.py | 40 ++++++++++++++++++-------
1 file changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------