You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/22 00:36:32 UTC
[1/4] incubator-beam git commit: Remove expensive per-element-step
logging context.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 37f426faf -> d78b38d1d
Remove expensive per-element-step logging context.
This is 3-4% of the total runtime.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f99aa77f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f99aa77f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f99aa77f
Branch: refs/heads/python-sdk
Commit: f99aa77f9acf5dbfdf762913a1f997d2c3658d3a
Parents: 37f426f
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 21 12:08:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 21 17:35:49 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.py | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f99aa77f/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index ef28c63..134fb06 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -95,9 +95,8 @@ class DoFnRunner(object):
def process(self, element):
try:
- with self.logger.PerThreadLoggingContext(step_name=self.step_name):
- self.context.set_element(element)
- self._process_outputs(element, self.dofn.process(self.context))
+ self.context.set_element(element)
+ self._process_outputs(element, self.dofn.process(self.context))
except BaseException as exn:
self.reraise_augmented(exn)
[2/4] incubator-beam git commit: Cache dofn.proces method.
Posted by ro...@apache.org.
Cache dofn.proces method.
Saves another couple percent.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c9d77ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c9d77ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c9d77ac
Branch: refs/heads/python-sdk
Commit: 7c9d77ac2509c7625c5709f885f8b5aadb4f9f74
Parents: f99aa77
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 21 12:11:03 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.pxd | 1 +
sdks/python/apache_beam/runners/common.py | 5 ++++-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index e855376..f01a362 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -23,6 +23,7 @@ cdef type SideOutputValue, TimestampedValue
cdef class DoFnRunner(object):
cdef object dofn
+ cdef object dofn_process
cdef object window_fn
cdef object context # TODO(robertwb): Make this a DoFnContext
cdef object tagged_receivers
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 134fb06..80db823 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -56,6 +56,7 @@ class DoFnRunner(object):
step_name=None):
if not args and not kwargs:
self.dofn = fn
+ self.dofn_process = fn.process
else:
args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs)
@@ -70,6 +71,8 @@ class DoFnRunner(object):
def finish_bundle(self, context):
return fn.finish_bundle(context)
self.dofn = CurriedFn()
+ self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
+
self.window_fn = windowing.windowfn
self.context = context
self.tagged_receivers = tagged_receivers
@@ -96,7 +99,7 @@ class DoFnRunner(object):
def process(self, element):
try:
self.context.set_element(element)
- self._process_outputs(element, self.dofn.process(self.context))
+ self._process_outputs(element, self.dofn_process(self.context))
except BaseException as exn:
self.reraise_augmented(exn)
[4/4] incubator-beam git commit: Restore (faster) logging context.
Posted by ro...@apache.org.
Restore (faster) logging context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf9e3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf9e3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf9e3a3
Branch: refs/heads/python-sdk
Commit: ecf9e3a3cc3dcbb3413403d4558c95d4a0097350
Parents: 7c9d77a
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 21 15:24:01 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.pxd | 8 +++++++-
sdks/python/apache_beam/runners/common.py | 20 ++++++++++++++------
2 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index f01a362..7191659 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -27,7 +27,7 @@ cdef class DoFnRunner(object):
cdef object window_fn
cdef object context # TODO(robertwb): Make this a DoFnContext
cdef object tagged_receivers
- cdef object logger
+ cdef object logging_context # TODO(robertwb): Make this a LoggingContext
cdef object step_name
cdef object main_receivers # TODO(robertwb): Make this a Receiver
@@ -44,3 +44,9 @@ cdef class DoFnContext(object):
cdef class Receiver(object):
cdef receive(self, WindowedValue windowed_value)
+
+
+cdef class LoggingContext(object):
+ # TODO(robertwb): Optimize "with [cdef class]"
+ cpdef enter(self)
+ cpdef exit(self)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 80db823..a565645 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -29,14 +29,12 @@ from apache_beam.transforms.window import WindowFn
from apache_beam.utils.windowed_value import WindowedValue
-class FakeLogger(object):
- def PerThreadLoggingContext(self, *unused_args, **unused_kwargs):
- return self
+class LoggingContext(object):
- def __enter__(self):
+ def enter(self):
pass
- def __exit__(self, *unused_args):
+ def exit(self):
pass
@@ -76,7 +74,8 @@ class DoFnRunner(object):
self.window_fn = windowing.windowfn
self.context = context
self.tagged_receivers = tagged_receivers
- self.logger = logger or FakeLogger()
+ self.logging_context = (logger.PerThreadLoggingContext(step_name=step_name)
+ if logger else LoggingContext())
self.step_name = step_name
# Optimize for the common case.
@@ -85,23 +84,32 @@ class DoFnRunner(object):
def start(self):
self.context.set_element(None)
try:
+ self.logging_context.enter()
self._process_outputs(None, self.dofn.start_bundle(self.context))
except BaseException as exn:
self.reraise_augmented(exn)
+ finally:
+ self.logging_context.exit()
def finish(self):
self.context.set_element(None)
try:
+ self.logging_context.enter()
self._process_outputs(None, self.dofn.finish_bundle(self.context))
except BaseException as exn:
self.reraise_augmented(exn)
+ finally:
+ self.logging_context.exit()
def process(self, element):
try:
+ self.logging_context.enter()
self.context.set_element(element)
self._process_outputs(element, self.dofn_process(self.context))
except BaseException as exn:
self.reraise_augmented(exn)
+ finally:
+ self.logging_context.exit()
def reraise_augmented(self, exn):
if getattr(exn, '_tagged_with_step', False) or not self.step_name:
[3/4] incubator-beam git commit: Closes #706
Posted by ro...@apache.org.
Closes #706
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d78b38d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d78b38d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d78b38d1
Branch: refs/heads/python-sdk
Commit: d78b38d1dab75f65aa6dfc3bca53d58bafc1d37a
Parents: 37f426f ecf9e3a
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Jul 21 17:36:04 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.pxd | 9 +++++++-
sdks/python/apache_beam/runners/common.py | 28 +++++++++++++++++--------
2 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------