You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/22 00:36:33 UTC

[2/4] incubator-beam git commit: Cache dofn.proces method.

Cache dofn.proces method.

Saves another couple percent.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c9d77ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c9d77ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c9d77ac

Branch: refs/heads/python-sdk
Commit: 7c9d77ac2509c7625c5709f885f8b5aadb4f9f74
Parents: f99aa77
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 21 12:11:03 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.pxd | 1 +
 sdks/python/apache_beam/runners/common.py  | 5 ++++-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index e855376..f01a362 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -23,6 +23,7 @@ cdef type SideOutputValue, TimestampedValue
 cdef class DoFnRunner(object):
 
   cdef object dofn
+  cdef object dofn_process
   cdef object window_fn
   cdef object context   # TODO(robertwb): Make this a DoFnContext
   cdef object tagged_receivers

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 134fb06..80db823 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -56,6 +56,7 @@ class DoFnRunner(object):
                step_name=None):
     if not args and not kwargs:
       self.dofn = fn
+      self.dofn_process = fn.process
     else:
       args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs)
 
@@ -70,6 +71,8 @@ class DoFnRunner(object):
         def finish_bundle(self, context):
           return fn.finish_bundle(context)
       self.dofn = CurriedFn()
+      self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
+
     self.window_fn = windowing.windowfn
     self.context = context
     self.tagged_receivers = tagged_receivers
@@ -96,7 +99,7 @@ class DoFnRunner(object):
   def process(self, element):
     try:
       self.context.set_element(element)
-      self._process_outputs(element, self.dofn.process(self.context))
+      self._process_outputs(element, self.dofn_process(self.context))
     except BaseException as exn:
       self.reraise_augmented(exn)