You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/04/26 06:32:31 UTC
[1/2] beam git commit: Updates DoFn invocation logic to be more
extensible.
Repository: beam
Updated Branches:
refs/heads/master afb96d72a -> 009469972
Updates DoFn invocation logic to be more extensible.
Adds following abstractions.
DoFnSignature: describes the signature of a given DoFn object.
DoFnInvoker: defines a particular way for invoking DoFn methods.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7db375d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7db375d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7db375d4
Branch: refs/heads/master
Commit: 7db375d40f9b230d989c087ccfc08844c29afdac
Parents: afb96d7
Author: chamikara@google.com <ch...@google.com>
Authored: Fri Apr 7 13:41:28 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Apr 25 23:30:01 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.pxd | 62 +++-
sdks/python/apache_beam/runners/common.py | 441 ++++++++++++++++--------
2 files changed, 344 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7db375d4/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 5952942..f3395c1 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -28,32 +28,62 @@ cdef class Receiver(object):
cpdef receive(self, WindowedValue windowed_value)
-cdef class DoFnRunner(Receiver):
+cdef class DoFnMethodWrapper(object):
+ cdef public object args
+ cdef public object defaults
+ cdef public object method_value
- cdef object dofn
- cdef object dofn_process
- cdef object window_fn
+
+cdef class DoFnSignature(object):
+ cdef public DoFnMethodWrapper process_method
+ cdef public DoFnMethodWrapper start_bundle_method
+ cdef public DoFnMethodWrapper finish_bundle_method
+ cdef public object do_fn
+
+
+cdef class DoFnInvoker(object):
+ cdef public DoFnSignature signature
+ cdef OutputProcessor output_processor
+
+ cpdef invoke_process(self, WindowedValue windowed_value)
+ cpdef invoke_start_bundle(self)
+ cpdef invoke_finish_bundle(self)
+
+ # TODO(chamikara) define static method create_invoker() here.
+
+
+cdef class SimpleInvoker(DoFnInvoker):
+ cdef object process_method
+
+
+cdef class PerWindowInvoker(DoFnInvoker):
+ cdef list side_inputs
+ cdef DoFnContext context
+ cdef list args_for_process
+ cdef dict kwargs_for_process
+ cdef list placeholders
+ cdef bint has_windowed_inputs
+ cdef object process_method
+
+
+cdef class DoFnRunner(Receiver):
cdef DoFnContext context
- cdef object tagged_receivers
cdef LoggingContext logging_context
cdef object step_name
- cdef list args
- cdef dict kwargs
cdef ScopedMetricsContainer scoped_metrics_container
cdef list side_inputs
- cdef bint has_windowed_inputs
- cdef list placeholders
- cdef bint use_simple_invoker
+ cdef DoFnInvoker do_fn_invoker
+
+ cpdef process(self, WindowedValue windowed_value)
- cdef Receiver main_receivers
- cpdef process(self, WindowedValue element)
- cdef _dofn_invoker(self, WindowedValue element)
- cdef _dofn_simple_invoker(self, WindowedValue element)
- cdef _dofn_per_window_invoker(self, WindowedValue element)
+cdef class OutputProcessor(object):
+ cdef object window_fn
+ cdef Receiver main_receivers
+ cdef object tagged_receivers
@cython.locals(windowed_value=WindowedValue)
- cpdef _process_outputs(self, WindowedValue element, results)
+ cpdef process_outputs(self, WindowedValue element, results)
cdef class DoFnContext(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/7db375d4/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 64d6d00..08071a6 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -51,6 +51,262 @@ class Receiver(object):
raise NotImplementedError
+class DoFnMethodWrapper(object):
+ """Represents a method of a DoFn object."""
+
+ def __init__(self, do_fn, method_name):
+ """
+ Initiates a ``DoFnMethodWrapper``.
+
+ Args:
+ do_fn: A DoFn object that contains the method.
+ method_name: name of the method as a string.
+ """
+
+ args, _, _, defaults = do_fn.get_function_arguments(method_name)
+ defaults = defaults if defaults else []
+ method_value = getattr(do_fn, method_name)
+ self.method_value = method_value
+ self.args = args
+ self.defaults = defaults
+
+
+class DoFnSignature(object):
+ """Represents the signature of a given ``DoFn`` object.
+
+ Signature of a ``DoFn`` provides a view of the properties of a given ``DoFn``.
+ Among other things, this will give an extensible way for for (1) accessing the
+ structure of the ``DoFn`` including methods and method parameters
+ (2) identifying features that a given ``DoFn`` support, for example, whether
+ a given ``DoFn`` is a Splittable ``DoFn`` (
+ https://s.apache.org/splittable-do-fn) (3) validating a ``DoFn`` based on the
+ feature set offered by it.
+ """
+
+ def __init__(self, do_fn):
+ # We add a property here for all methods defined by Beam DoFn features.
+
+ assert isinstance(do_fn, core.DoFn)
+ self.do_fn = do_fn
+
+ self.process_method = DoFnMethodWrapper(do_fn, 'process')
+ self.start_bundle_method = DoFnMethodWrapper(do_fn, 'start_bundle')
+ self.finish_bundle_method = DoFnMethodWrapper(do_fn, 'finish_bundle')
+ self._validate()
+
+ def _validate(self):
+ self._validate_bundle_method(self.start_bundle_method)
+ self._validate_bundle_method(self.finish_bundle_method)
+
+ def _validate_bundle_method(self, method_wrapper):
+ # Bundle methods may only contain ContextParam.
+
+ # Here we use the fact that every DoFn parameter defined in core.DoFn has
+ # the value that is the same as the name of the parameter and ends with
+ # string 'Param'.
+ unsupported_dofn_params = [i for i in core.DoFn.__dict__ if (
+ i.endswith('Param') and i != 'ContextParam')]
+
+ for param in unsupported_dofn_params:
+ assert param not in method_wrapper.defaults
+
+
+class DoFnInvoker(object):
+ """An abstraction that can be used to execute DoFn methods.
+
+ A DoFnInvoker describes a particular way for invoking methods of a DoFn
+ represented by a given DoFnSignature."""
+
+ def __init__(self, output_processor, signature):
+ self.output_processor = output_processor
+ self.signature = signature
+
+ @staticmethod
+ def create_invoker(
+ output_processor,
+ signature, context, side_inputs, input_args, input_kwargs):
+ """ Creates a new DoFnInvoker based on given arguments.
+
+ Args:
+ signature: a DoFnSignature for the DoFn being invoked.
+ context: Context to be used when invoking the DoFn (deprecated).
+ side_inputs: side inputs to be used when invoking th process method.
+ input_args: arguments to be used when invoking the process method
+ input_kwargs: kwargs to be used when invoking the process method.
+ """
+ default_arg_values = signature.process_method.defaults
+ use_simple_invoker = (
+ not side_inputs and not input_args and not input_kwargs and
+ not default_arg_values)
+ if use_simple_invoker:
+ return SimpleInvoker(output_processor, signature)
+ else:
+ return PerWindowInvoker(
+ output_processor,
+ signature, context, side_inputs, input_args, input_kwargs)
+
+ def invoke_process(self, windowed_value):
+ """Invokes the DoFn.process() function.
+
+ Args:
+ windowed_value: a WindowedValue object that gives the element for which
+ process() method should be invoked along with the window
+ the element belongs to.
+ """
+ raise NotImplementedError
+
+ def invoke_start_bundle(self):
+ """Invokes the DoFn.start_bundle() method.
+ """
+ defaults = self.signature.start_bundle_method.defaults
+ args = [self.context if d == core.DoFn.ContextParam else d
+ for d in defaults]
+ self.output_processor.process_outputs(
+ None, self.signature.start_bundle_method.method_value(*args))
+
+ def invoke_finish_bundle(self):
+ """Invokes the DoFn.finish_bundle() method.
+ """
+ defaults = self.signature.finish_bundle_method.defaults
+ args = [self.context if d == core.DoFn.ContextParam else d
+ for d in defaults]
+ self.output_processor.process_outputs(
+ None, self.signature.finish_bundle_method.method_value(*args))
+
+
+class SimpleInvoker(DoFnInvoker):
+ """An invoker that processes elements ignoring windowing information."""
+
+ def __init__(self, output_processor, signature):
+ super(SimpleInvoker, self).__init__(output_processor, signature)
+ self.process_method = signature.process_method.method_value
+
+ def invoke_process(self, windowed_value):
+ self.output_processor.process_outputs(
+ windowed_value, self.process_method(windowed_value.value))
+
+
+class PerWindowInvoker(DoFnInvoker):
+ """An invoker that processes elements considering windowing information."""
+
+ def __init__(self, output_processor, signature, context,
+ side_inputs, input_args, input_kwargs):
+ super(PerWindowInvoker, self).__init__(output_processor, signature)
+ self.side_inputs = side_inputs
+ self.context = context
+ self.process_method = signature.process_method.method_value
+ default_arg_values = signature.process_method.defaults
+ self.has_windowed_inputs = (
+ not all(si.is_globally_windowed() for si in side_inputs) or
+ (core.DoFn.WindowParam in default_arg_values))
+
+ # Try to prepare all the arguments that can just be filled in
+ # without any additional work. in the process function.
+ # Also cache all the placeholders needed in the process function.
+
+ # Fill in sideInputs if they are globally windowed
+ global_window = GlobalWindow()
+
+ input_args = input_args if input_args else []
+ input_kwargs = input_kwargs if input_kwargs else {}
+
+ if not self.has_windowed_inputs:
+ input_args, input_kwargs = util.insert_values_in_args(
+ input_args, input_kwargs, [si[global_window] for si in side_inputs])
+
+ arguments = signature.process_method.args
+ defaults = signature.process_method.defaults
+
+ # Create placeholder for element parameter of DoFn.process() method.
+ self_in_args = int(signature.do_fn.is_process_bounded())
+
+ class ArgPlaceholder(object):
+ def __init__(self, placeholder):
+ self.placeholder = placeholder
+
+ if core.DoFn.ElementParam not in default_arg_values:
+ args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args
+ args_with_placeholders = (
+ [ArgPlaceholder(core.DoFn.ElementParam)] + input_args[:args_to_pick])
+ else:
+ args_to_pick = len(arguments) - len(defaults) - self_in_args
+ args_with_placeholders = input_args[:args_to_pick]
+
+ # Fill the OtherPlaceholders for context, window or timestamp
+ remaining_args_iter = iter(input_args[args_to_pick:])
+ for a, d in zip(arguments[-len(defaults):], defaults):
+ if d == core.DoFn.ElementParam:
+ args_with_placeholders.append(ArgPlaceholder(d))
+ elif d == core.DoFn.ContextParam:
+ args_with_placeholders.append(ArgPlaceholder(d))
+ elif d == core.DoFn.WindowParam:
+ args_with_placeholders.append(ArgPlaceholder(d))
+ elif d == core.DoFn.TimestampParam:
+ args_with_placeholders.append(ArgPlaceholder(d))
+ elif d == core.DoFn.SideInputParam:
+ # If no more args are present then the value must be passed via kwarg
+ try:
+ args_with_placeholders.append(remaining_args_iter.next())
+ except StopIteration:
+ if a not in input_kwargs:
+ raise ValueError("Value for sideinput %s not provided" % a)
+ else:
+ # If no more args are present then the value must be passed via kwarg
+ try:
+ args_with_placeholders.append(remaining_args_iter.next())
+ except StopIteration:
+ pass
+ args_with_placeholders.extend(list(remaining_args_iter))
+
+ # Stash the list of placeholder positions for performance
+ self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(
+ args_with_placeholders)
+ if isinstance(x, ArgPlaceholder)]
+
+ self.args_for_process = args_with_placeholders
+ self.kwargs_for_process = input_kwargs
+
+ def invoke_process(self, windowed_value):
+ self.context.set_element(windowed_value)
+ # Call for the process function for each window if has windowed side inputs
+ # or if the process accesses the window parameter. We can just call it once
+ # otherwise as none of the arguments are changing
+ if self.has_windowed_inputs and len(windowed_value.windows) != 1:
+ for w in windowed_value.windows:
+ self._invoke_per_window(
+ WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)))
+ else:
+ self._invoke_per_window(windowed_value)
+
+ def _invoke_per_window(self, windowed_value):
+ if self.has_windowed_inputs:
+ window, = windowed_value.windows
+ args_for_process, kwargs_for_process = util.insert_values_in_args(
+ self.args_for_process, self.kwargs_for_process,
+ [si[window] for si in self.side_inputs])
+ else:
+ args_for_process, kwargs_for_process = (
+ self.args_for_process, self.kwargs_for_process)
+ # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
+ for i, p in self.placeholders:
+ if p == core.DoFn.ElementParam:
+ args_for_process[i] = windowed_value.value
+ elif p == core.DoFn.ContextParam:
+ args_for_process[i] = self.context
+ elif p == core.DoFn.WindowParam:
+ args_for_process[i] = window
+ elif p == core.DoFn.TimestampParam:
+ args_for_process[i] = windowed_value.timestamp
+
+ if kwargs_for_process:
+ self.output_processor.process_outputs(
+ windowed_value,
+ self.process_method(*args_for_process, **kwargs_for_process))
+ else:
+ self.output_processor.process_outputs(
+ windowed_value, self.process_method(*args_for_process))
+
+
class DoFnRunner(Receiver):
"""A helper class for executing ParDo operations.
"""
@@ -88,13 +344,9 @@ class DoFnRunner(Receiver):
state: handle for accessing DoFn state
scoped_metrics_container: Context switcher for metrics container
"""
- self.step_name = step_name
- self.window_fn = windowing.windowfn
- self.tagged_receivers = tagged_receivers
self.scoped_metrics_container = (scoped_metrics_container
or ScopedMetricsContainer())
-
- global_window = GlobalWindow()
+ self.step_name = step_name
# Need to support multiple iterations.
side_inputs = list(side_inputs)
@@ -104,172 +356,59 @@ class DoFnRunner(Receiver):
else:
self.logging_context = get_logging_context(logger, step_name=step_name)
- # Optimize for the common case.
- self.main_receivers = as_receiver(tagged_receivers[None])
-
# TODO(sourabh): Deprecate the use of context
if state:
assert context is None
- self.context = DoFnContext(self.step_name, state=state)
+ context = DoFnContext(step_name, state=state)
else:
assert context is not None
- self.context = context
+ context = context
- class ArgPlaceholder(object):
- def __init__(self, placeholder):
- self.placeholder = placeholder
-
- # Stash values for use in dofn_process.
- self.side_inputs = side_inputs
- self.has_windowed_inputs = not all(
- si.is_globally_windowed() for si in self.side_inputs)
+ self.context = context
- self.args = args if args else []
- self.kwargs = kwargs if kwargs else {}
- self.dofn = fn
- self.dofn_process = fn.process
-
- arguments, _, _, defaults = self.dofn.get_function_arguments('process')
- defaults = defaults if defaults else []
- self_in_args = int(self.dofn.is_process_bounded())
+ do_fn_signature = DoFnSignature(fn)
- self.use_simple_invoker = (
- not side_inputs and not args and not kwargs and not defaults)
- if self.use_simple_invoker:
- # As we're using the simple invoker we don't need to compute placeholders
- return
-
- self.has_windowed_inputs = (self.has_windowed_inputs or
- core.DoFn.WindowParam in defaults)
-
- # Try to prepare all the arguments that can just be filled in
- # without any additional work. in the process function.
- # Also cache all the placeholders needed in the process function.
-
- # Fill in sideInputs if they are globally windowed
- if not self.has_windowed_inputs:
- self.args, self.kwargs = util.insert_values_in_args(
- args, kwargs, [si[global_window] for si in side_inputs])
-
- # Create placeholder for element parameter
- if core.DoFn.ElementParam not in defaults:
- args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
- final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \
- self.args[:args_to_pick]
- else:
- args_to_pick = len(arguments) - len(defaults) - self_in_args
- final_args = self.args[:args_to_pick]
-
- # Fill the OtherPlaceholders for context, window or timestamp
- args = iter(self.args[args_to_pick:])
- for a, d in zip(arguments[-len(defaults):], defaults):
- if d == core.DoFn.ElementParam:
- final_args.append(ArgPlaceholder(d))
- elif d == core.DoFn.ContextParam:
- final_args.append(ArgPlaceholder(d))
- elif d == core.DoFn.WindowParam:
- final_args.append(ArgPlaceholder(d))
- elif d == core.DoFn.TimestampParam:
- final_args.append(ArgPlaceholder(d))
- elif d == core.DoFn.SideInputParam:
- # If no more args are present then the value must be passed via kwarg
- try:
- final_args.append(args.next())
- except StopIteration:
- if a not in self.kwargs:
- raise ValueError("Value for sideinput %s not provided" % a)
- else:
- # If no more args are present then the value must be passed via kwarg
- try:
- final_args.append(args.next())
- except StopIteration:
- pass
- final_args.extend(list(args))
- self.args = final_args
+ # Optimize for the common case.
+ main_receivers = as_receiver(tagged_receivers[None])
+ output_processor = OutputProcessor(
+ windowing.windowfn, main_receivers, tagged_receivers)
- # Stash the list of placeholder positions for performance
- self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args)
- if isinstance(x, ArgPlaceholder)]
+ self.do_fn_invoker = DoFnInvoker.create_invoker(
+ output_processor, do_fn_signature, context, side_inputs, args, kwargs)
def receive(self, windowed_value):
self.process(windowed_value)
- def _dofn_simple_invoker(self, element):
- self._process_outputs(element, self.dofn_process(element.value))
-
- def _dofn_per_window_invoker(self, element):
- if self.has_windowed_inputs:
- window, = element.windows
- args, kwargs = util.insert_values_in_args(
- self.args, self.kwargs, [si[window] for si in self.side_inputs])
- else:
- args, kwargs = self.args, self.kwargs
- # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
- for i, p in self.placeholders:
- if p == core.DoFn.ElementParam:
- args[i] = element.value
- elif p == core.DoFn.ContextParam:
- args[i] = self.context
- elif p == core.DoFn.WindowParam:
- args[i] = window
- elif p == core.DoFn.TimestampParam:
- args[i] = element.timestamp
- if not kwargs:
- self._process_outputs(element, self.dofn_process(*args))
- else:
- self._process_outputs(element, self.dofn_process(*args, **kwargs))
-
- def _dofn_invoker(self, element):
- self.context.set_element(element)
- # Call for the process function for each window if has windowed side inputs
- # or if the process accesses the window parameter. We can just call it once
- # otherwise as none of the arguments are changing
- if self.has_windowed_inputs and len(element.windows) != 1:
- for w in element.windows:
- self._dofn_per_window_invoker(
- WindowedValue(element.value, element.timestamp, (w,)))
- else:
- self._dofn_per_window_invoker(element)
-
- def _invoke_bundle_method(self, method):
+ def process(self, windowed_value):
try:
self.logging_context.enter()
self.scoped_metrics_container.enter()
- self.context.set_element(None)
- f = getattr(self.dofn, method)
-
- _, _, _, defaults = self.dofn.get_function_arguments(method)
- defaults = defaults if defaults else []
- args = [self.context if d == core.DoFn.ContextParam else d
- for d in defaults]
- self._process_outputs(None, f(*args))
+ self.do_fn_invoker.invoke_process(windowed_value)
except BaseException as exn:
- self.reraise_augmented(exn)
+ self._reraise_augmented(exn)
finally:
self.scoped_metrics_container.exit()
self.logging_context.exit()
- def start(self):
- self._invoke_bundle_method('start_bundle')
-
- def finish(self):
- self._invoke_bundle_method('finish_bundle')
-
- def process(self, element):
+ def _invoke_bundle_method(self, bundle_method):
try:
self.logging_context.enter()
self.scoped_metrics_container.enter()
- if self.use_simple_invoker:
- self._dofn_simple_invoker(element)
- else:
- self._dofn_invoker(element)
+ self.context.set_element(None)
+ bundle_method()
except BaseException as exn:
- self.reraise_augmented(exn)
+ self._reraise_augmented(exn)
finally:
self.scoped_metrics_container.exit()
self.logging_context.exit()
- def reraise_augmented(self, exn):
+ def start(self):
+ self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
+
+ def finish(self):
+ self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
+
+ def _reraise_augmented(self, exn):
if getattr(exn, '_tagged_with_step', False) or not self.step_name:
raise
args = exn.args
@@ -280,7 +419,23 @@ class DoFnRunner(Receiver):
else:
raise
- def _process_outputs(self, windowed_input_element, results):
+
+class OutputProcessor(object):
+ """Processes output produced by DoFn method invocations."""
+
+ def __init__(self, window_fn, main_receivers, tagged_receivers):
+ """Initializes ``OutputProcessor``.
+
+ Args:
+ window_fn: a windowing function (WindowFn).
+ main_receivers: a dict of tag name to Receiver objects.
+ tagged_receivers: main receiver object.
+ """
+ self.window_fn = window_fn
+ self.main_receivers = main_receivers
+ self.tagged_receivers = tagged_receivers
+
+ def process_outputs(self, windowed_input_element, results):
"""Dispatch the result of computation to the appropriate receivers.
A value wrapped in a OutputValue object will be unwrapped and
[2/2] beam git commit: This closes #2519
Posted by ch...@apache.org.
This closes #2519
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/00946997
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/00946997
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/00946997
Branch: refs/heads/master
Commit: 00946997229bf8a764ce0a135b2112bacd7a3993
Parents: afb96d7 7db375d
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Apr 25 23:31:35 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Apr 25 23:31:35 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.pxd | 62 +++-
sdks/python/apache_beam/runners/common.py | 441 ++++++++++++++++--------
2 files changed, 344 insertions(+), 159 deletions(-)
----------------------------------------------------------------------