You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/28 18:10:04 UTC

[1/2] incubator-beam git commit: Optimize Map and Flatmap when there are no side inputs.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b4716d9dc -> 351c3831d


Optimize Map and Flatmap when there are no side inputs.

varargs and kwargs are expensive, even when they're empty.

This is especially true for otherwise one-argument Python calls
which are special cased in CPython.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d2fb1f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d2fb1f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d2fb1f8

Branch: refs/heads/python-sdk
Commit: 7d2fb1f88d1a2370dd4053f3a1738cbb9838cc2f
Parents: b4716d9
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 27 18:29:59 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:09:48 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py | 40 ++++++++++++++++++-------
 1 file changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d2fb1f8/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 5e6aafc..38b9cd2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -20,6 +20,8 @@
 from __future__ import absolute_import
 
 import copy
+import inspect
+import types
 
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -194,6 +196,16 @@ class DoFn(WithTypeHints):
       return type_hint
 
 
+def _fn_takes_side_inputs(fn):
+  try:
+    argspec = inspect.getargspec(fn)
+  except TypeError:
+    # We can't tell; maybe it does.
+    return True
+  is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None
+  return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords
+
+
 class CallableWrapperDoFn(DoFn):
   """A DoFn (function) object wrapping a callable object.
 
@@ -214,6 +226,11 @@ class CallableWrapperDoFn(DoFn):
       raise TypeError('Expected a callable object instead of: %r' % fn)
 
     self._fn = fn
+    if _fn_takes_side_inputs(fn):
+      self.process = lambda context, *args, **kwargs: fn(
+          context.element, *args, **kwargs)
+    else:
+      self.process = lambda context: fn(context.element)
 
     super(CallableWrapperDoFn, self).__init__()
 
@@ -237,9 +254,6 @@ class CallableWrapperDoFn(DoFn):
     return self._strip_output_annotations(
         trivial_inference.infer_return_type(self._fn, [input_type]))
 
-  def process(self, context, *args, **kwargs):
-    return self._fn(context.element, *args, **kwargs)
-
   def process_argspec_fn(self):
     return getattr(self._fn, '_argspec_fn', self._fn)
 
@@ -676,7 +690,10 @@ def Map(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
         'Map can be used only with callable objects. '
         'Received %r instead for %s argument.'
         % (fn, 'first' if label is None else 'second'))
-  wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+  if _fn_takes_side_inputs(fn):
+    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+  else:
+    wrapper = lambda x: [fn(x)]
 
   # Proxy the type-hint information from the original function to this new
   # wrapped function.
@@ -1008,21 +1025,24 @@ class GroupByKey(PTransform):
       value_type = windowed_value_iter_type.inner_type.inner_type
       return Iterable[KV[key_type, Iterable[value_type]]]
 
-    def process(self, context):
-      k, vs = context.element
+    def start_bundle(self, context):
       # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam.transforms.trigger import InMemoryUnmergedState
       from apache_beam.transforms.trigger import create_trigger_driver
       # pylint: enable=wrong-import-order, wrong-import-position
-      driver = create_trigger_driver(self.windowing, True)
-      state = InMemoryUnmergedState()
+      self.driver = create_trigger_driver(self.windowing, True)
+      self.state_type = InMemoryUnmergedState
+
+    def process(self, context):
+      k, vs = context.element
+      state = self.state_type()
       # TODO(robertwb): Conditionally process in smaller chunks.
-      for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
+      for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
         yield wvalue.with_value((k, wvalue.value))
       while state.timers:
         fired = state.get_and_clear_timers()
         for timer_window, (name, time_domain, fire_time) in fired:
-          for wvalue in driver.process_timer(
+          for wvalue in self.driver.process_timer(
               timer_window, name, time_domain, fire_time, state):
             yield wvalue.with_value((k, wvalue.value))
 


[2/2] incubator-beam git commit: Closes #748

Posted by ro...@apache.org.
Closes #748


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/351c3831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/351c3831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/351c3831

Branch: refs/heads/python-sdk
Commit: 351c3831de1bdcfcb19b2f24f9f0b6a19e77e421
Parents: b4716d9 7d2fb1f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jul 28 11:09:49 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jul 28 11:09:49 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py | 40 ++++++++++++++++++-------
 1 file changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------