You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/04/26 06:32:31 UTC

[1/2] beam git commit: Updates DoFn invocation logic to be more extensible.

Repository: beam
Updated Branches:
  refs/heads/master afb96d72a -> 009469972

Updates DoFn invocation logic to be more extensible.

Adds following abstractions.

DoFnSignature: describes the signature of a given DoFn object.
DoFnInvoker: defines a particular way for invoking DoFn methods.


Branch: refs/heads/master
Commit: 7db375d40f9b230d989c087ccfc08844c29afdac
Parents: afb96d7
Author: <>
Authored: Fri Apr 7 13:41:28 2017 -0700
Committer: <>
Committed: Tue Apr 25 23:30:01 2017 -0700

 sdks/python/apache_beam/runners/common.pxd |  62 +++-
 sdks/python/apache_beam/runners/  | 441 ++++++++++++++++--------
 2 files changed, 344 insertions(+), 159 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 5952942..f3395c1 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -28,32 +28,62 @@ cdef class Receiver(object):
   cpdef receive(self, WindowedValue windowed_value)
-cdef class DoFnRunner(Receiver):
+cdef class DoFnMethodWrapper(object):
+  cdef public object args
+  cdef public object defaults
+  cdef public object method_value
-  cdef object dofn
-  cdef object dofn_process
-  cdef object window_fn
+cdef class DoFnSignature(object):
+  cdef public DoFnMethodWrapper process_method
+  cdef public DoFnMethodWrapper start_bundle_method
+  cdef public DoFnMethodWrapper finish_bundle_method
+  cdef public object do_fn
+cdef class DoFnInvoker(object):
+  cdef public DoFnSignature signature
+  cdef OutputProcessor output_processor
+  cpdef invoke_process(self, WindowedValue windowed_value)
+  cpdef invoke_start_bundle(self)
+  cpdef invoke_finish_bundle(self)
+  # TODO(chamikara) define static method create_invoker() here.
+cdef class SimpleInvoker(DoFnInvoker):
+  cdef object process_method
+cdef class PerWindowInvoker(DoFnInvoker):
+  cdef list side_inputs
+  cdef DoFnContext context
+  cdef list args_for_process
+  cdef dict kwargs_for_process
+  cdef list placeholders
+  cdef bint has_windowed_inputs
+  cdef object process_method
+cdef class DoFnRunner(Receiver):
   cdef DoFnContext context
-  cdef object tagged_receivers
   cdef LoggingContext logging_context
   cdef object step_name
-  cdef list args
-  cdef dict kwargs
   cdef ScopedMetricsContainer scoped_metrics_container
   cdef list side_inputs
-  cdef bint has_windowed_inputs
-  cdef list placeholders
-  cdef bint use_simple_invoker
+  cdef DoFnInvoker do_fn_invoker
+  cpdef process(self, WindowedValue windowed_value)
-  cdef Receiver main_receivers
-  cpdef process(self, WindowedValue element)
-  cdef _dofn_invoker(self, WindowedValue element)
-  cdef _dofn_simple_invoker(self, WindowedValue element)
-  cdef _dofn_per_window_invoker(self, WindowedValue element)
+cdef class OutputProcessor(object):
+  cdef object window_fn
+  cdef Receiver main_receivers
+  cdef object tagged_receivers
-  cpdef _process_outputs(self, WindowedValue element, results)
+  cpdef process_outputs(self, WindowedValue element, results)
 cdef class DoFnContext(object):
diff --git a/sdks/python/apache_beam/runners/ b/sdks/python/apache_beam/runners/
index 64d6d00..08071a6 100644
--- a/sdks/python/apache_beam/runners/
+++ b/sdks/python/apache_beam/runners/
@@ -51,6 +51,262 @@ class Receiver(object):
     raise NotImplementedError
+class DoFnMethodWrapper(object):
+  """Represents a method of a DoFn object."""
+  def __init__(self, do_fn, method_name):
+    """
+    Initiates a ``DoFnMethodWrapper``.
+    Args:
+      do_fn: A DoFn object that contains the method.
+      method_name: name of the method as a string.
+    """
+    args, _, _, defaults = do_fn.get_function_arguments(method_name)
+    defaults = defaults if defaults else []
+    method_value = getattr(do_fn, method_name)
+    self.method_value = method_value
+    self.args = args
+    self.defaults = defaults
+class DoFnSignature(object):
+  """Represents the signature of a given ``DoFn`` object.
+  Signature of a ``DoFn`` provides a view of the properties of a given ``DoFn``.
+  Among other things, this will give an extensible way for for (1) accessing the
+  structure of the ``DoFn`` including methods and method parameters
+  (2) identifying features that a given ``DoFn`` support, for example, whether
+  a given ``DoFn`` is a Splittable ``DoFn`` (
+ (3) validating a ``DoFn`` based on the
+  feature set offered by it.
+  """
+  def __init__(self, do_fn):
+    # We add a property here for all methods defined by Beam DoFn features.
+    assert isinstance(do_fn, core.DoFn)
+    self.do_fn = do_fn
+    self.process_method = DoFnMethodWrapper(do_fn, 'process')
+    self.start_bundle_method = DoFnMethodWrapper(do_fn, 'start_bundle')
+    self.finish_bundle_method = DoFnMethodWrapper(do_fn, 'finish_bundle')
+    self._validate()
+  def _validate(self):
+    self._validate_bundle_method(self.start_bundle_method)
+    self._validate_bundle_method(self.finish_bundle_method)
+  def _validate_bundle_method(self, method_wrapper):
+    # Bundle methods may only contain ContextParam.
+    # Here we use the fact that every DoFn parameter defined in core.DoFn has
+    # the value that is the same as the name of the parameter and ends with
+    # string 'Param'.
+    unsupported_dofn_params = [i for i in core.DoFn.__dict__ if (
+        i.endswith('Param') and i != 'ContextParam')]
+    for param in unsupported_dofn_params:
+      assert param not in method_wrapper.defaults
+class DoFnInvoker(object):
+  """An abstraction that can be used to execute DoFn methods.
+  A DoFnInvoker describes a particular way for invoking methods of a DoFn
+  represented by a given DoFnSignature."""
+  def __init__(self, output_processor, signature):
+    self.output_processor = output_processor
+    self.signature = signature
+  @staticmethod
+  def create_invoker(
+      output_processor,
+      signature, context, side_inputs, input_args, input_kwargs):
+    """ Creates a new DoFnInvoker based on given arguments.
+    Args:
+        signature: a DoFnSignature for the DoFn being invoked.
+        context: Context to be used when invoking the DoFn (deprecated).
+        side_inputs: side inputs to be used when invoking th process method.
+        input_args: arguments to be used when invoking the process method
+        input_kwargs: kwargs to be used when invoking the process method.
+    """
+    default_arg_values = signature.process_method.defaults
+    use_simple_invoker = (
+        not side_inputs and not input_args and not input_kwargs and
+        not default_arg_values)
+    if use_simple_invoker:
+      return SimpleInvoker(output_processor, signature)
+    else:
+      return PerWindowInvoker(
+          output_processor,
+          signature, context, side_inputs, input_args, input_kwargs)
+  def invoke_process(self, windowed_value):
+    """Invokes the DoFn.process() function.
+    Args:
+      windowed_value: a WindowedValue object that gives the element for which
+                      process() method should be invoked along with the window
+                      the element belongs to.
+    """
+    raise NotImplementedError
+  def invoke_start_bundle(self):
+    """Invokes the DoFn.start_bundle() method.
+    """
+    defaults = self.signature.start_bundle_method.defaults
+    args = [self.context if d == core.DoFn.ContextParam else d
+            for d in defaults]
+    self.output_processor.process_outputs(
+        None, self.signature.start_bundle_method.method_value(*args))
+  def invoke_finish_bundle(self):
+    """Invokes the DoFn.finish_bundle() method.
+    """
+    defaults = self.signature.finish_bundle_method.defaults
+    args = [self.context if d == core.DoFn.ContextParam else d
+            for d in defaults]
+    self.output_processor.process_outputs(
+        None, self.signature.finish_bundle_method.method_value(*args))
+class SimpleInvoker(DoFnInvoker):
+  """An invoker that processes elements ignoring windowing information."""
+  def __init__(self, output_processor, signature):
+    super(SimpleInvoker, self).__init__(output_processor, signature)
+    self.process_method = signature.process_method.method_value
+  def invoke_process(self, windowed_value):
+    self.output_processor.process_outputs(
+        windowed_value, self.process_method(windowed_value.value))
+class PerWindowInvoker(DoFnInvoker):
+  """An invoker that processes elements considering windowing information."""
+  def __init__(self, output_processor, signature, context,
+               side_inputs, input_args, input_kwargs):
+    super(PerWindowInvoker, self).__init__(output_processor, signature)
+    self.side_inputs = side_inputs
+    self.context = context
+    self.process_method = signature.process_method.method_value
+    default_arg_values = signature.process_method.defaults
+    self.has_windowed_inputs = (
+        not all(si.is_globally_windowed() for si in side_inputs) or
+        (core.DoFn.WindowParam in default_arg_values))
+    # Try to prepare all the arguments that can just be filled in
+    # without any additional work. in the process function.
+    # Also cache all the placeholders needed in the process function.
+    # Fill in sideInputs if they are globally windowed
+    global_window = GlobalWindow()
+    input_args = input_args if input_args else []
+    input_kwargs = input_kwargs if input_kwargs else {}
+    if not self.has_windowed_inputs:
+      input_args, input_kwargs = util.insert_values_in_args(
+          input_args, input_kwargs, [si[global_window] for si in side_inputs])
+    arguments = signature.process_method.args
+    defaults = signature.process_method.defaults
+    # Create placeholder for element parameter of DoFn.process() method.
+    self_in_args = int(signature.do_fn.is_process_bounded())
+    class ArgPlaceholder(object):
+      def __init__(self, placeholder):
+        self.placeholder = placeholder
+    if core.DoFn.ElementParam not in default_arg_values:
+      args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args
+      args_with_placeholders = (
+          [ArgPlaceholder(core.DoFn.ElementParam)] + input_args[:args_to_pick])
+    else:
+      args_to_pick = len(arguments) - len(defaults) - self_in_args
+      args_with_placeholders = input_args[:args_to_pick]
+    # Fill the OtherPlaceholders for context, window or timestamp
+    remaining_args_iter = iter(input_args[args_to_pick:])
+    for a, d in zip(arguments[-len(defaults):], defaults):
+      if d == core.DoFn.ElementParam:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif d == core.DoFn.ContextParam:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif d == core.DoFn.WindowParam:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif d == core.DoFn.TimestampParam:
+        args_with_placeholders.append(ArgPlaceholder(d))
+      elif d == core.DoFn.SideInputParam:
+        # If no more args are present then the value must be passed via kwarg
+        try:
+          args_with_placeholders.append(
+        except StopIteration:
+          if a not in input_kwargs:
+            raise ValueError("Value for sideinput %s not provided" % a)
+      else:
+        # If no more args are present then the value must be passed via kwarg
+        try:
+          args_with_placeholders.append(
+        except StopIteration:
+          pass
+    args_with_placeholders.extend(list(remaining_args_iter))
+    # Stash the list of placeholder positions for performance
+    self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(
+        args_with_placeholders)
+                         if isinstance(x, ArgPlaceholder)]
+    self.args_for_process = args_with_placeholders
+    self.kwargs_for_process = input_kwargs
+  def invoke_process(self, windowed_value):
+    self.context.set_element(windowed_value)
+    # Call for the process function for each window if has windowed side inputs
+    # or if the process accesses the window parameter. We can just call it once
+    # otherwise as none of the arguments are changing
+    if self.has_windowed_inputs and len( != 1:
+      for w in
+        self._invoke_per_window(
+            WindowedValue(windowed_value.value, windowed_value.timestamp, (w,)))
+    else:
+      self._invoke_per_window(windowed_value)
+  def _invoke_per_window(self, windowed_value):
+    if self.has_windowed_inputs:
+      window, =
+      args_for_process, kwargs_for_process = util.insert_values_in_args(
+          self.args_for_process, self.kwargs_for_process,
+          [si[window] for si in self.side_inputs])
+    else:
+      args_for_process, kwargs_for_process = (
+          self.args_for_process, self.kwargs_for_process)
+    # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
+    for i, p in self.placeholders:
+      if p == core.DoFn.ElementParam:
+        args_for_process[i] = windowed_value.value
+      elif p == core.DoFn.ContextParam:
+        args_for_process[i] = self.context
+      elif p == core.DoFn.WindowParam:
+        args_for_process[i] = window
+      elif p == core.DoFn.TimestampParam:
+        args_for_process[i] = windowed_value.timestamp
+    if kwargs_for_process:
+      self.output_processor.process_outputs(
+          windowed_value,
+          self.process_method(*args_for_process, **kwargs_for_process))
+    else:
+      self.output_processor.process_outputs(
+          windowed_value, self.process_method(*args_for_process))
 class DoFnRunner(Receiver):
   """A helper class for executing ParDo operations.
@@ -88,13 +344,9 @@ class DoFnRunner(Receiver):
       state: handle for accessing DoFn state
       scoped_metrics_container: Context switcher for metrics container
-    self.step_name = step_name
-    self.window_fn = windowing.windowfn
-    self.tagged_receivers = tagged_receivers
     self.scoped_metrics_container = (scoped_metrics_container
                                      or ScopedMetricsContainer())
-    global_window = GlobalWindow()
+    self.step_name = step_name
     # Need to support multiple iterations.
     side_inputs = list(side_inputs)
@@ -104,172 +356,59 @@ class DoFnRunner(Receiver):
       self.logging_context = get_logging_context(logger, step_name=step_name)
-    # Optimize for the common case.
-    self.main_receivers = as_receiver(tagged_receivers[None])
     # TODO(sourabh): Deprecate the use of context
     if state:
       assert context is None
-      self.context = DoFnContext(self.step_name, state=state)
+      context = DoFnContext(step_name, state=state)
       assert context is not None
-      self.context = context
+      context = context
-    class ArgPlaceholder(object):
-      def __init__(self, placeholder):
-        self.placeholder = placeholder
-    # Stash values for use in dofn_process.
-    self.side_inputs = side_inputs
-    self.has_windowed_inputs = not all(
-        si.is_globally_windowed() for si in self.side_inputs)
+    self.context = context
-    self.args = args if args else []
-    self.kwargs = kwargs if kwargs else {}
-    self.dofn = fn
-    self.dofn_process = fn.process
-    arguments, _, _, defaults = self.dofn.get_function_arguments('process')
-    defaults = defaults if defaults else []
-    self_in_args = int(self.dofn.is_process_bounded())
+    do_fn_signature = DoFnSignature(fn)
-    self.use_simple_invoker = (
-        not side_inputs and not args and not kwargs and not defaults)
-    if self.use_simple_invoker:
-      # As we're using the simple invoker we don't need to compute placeholders
-      return
-    self.has_windowed_inputs = (self.has_windowed_inputs or
-                                core.DoFn.WindowParam in defaults)
-    # Try to prepare all the arguments that can just be filled in
-    # without any additional work. in the process function.
-    # Also cache all the placeholders needed in the process function.
-    # Fill in sideInputs if they are globally windowed
-    if not self.has_windowed_inputs:
-      self.args, self.kwargs = util.insert_values_in_args(
-          args, kwargs, [si[global_window] for si in side_inputs])
-    # Create placeholder for element parameter
-    if core.DoFn.ElementParam not in defaults:
-      args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args
-      final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \
-                   self.args[:args_to_pick]
-    else:
-      args_to_pick = len(arguments) - len(defaults) - self_in_args
-      final_args = self.args[:args_to_pick]
-    # Fill the OtherPlaceholders for context, window or timestamp
-    args = iter(self.args[args_to_pick:])
-    for a, d in zip(arguments[-len(defaults):], defaults):
-      if d == core.DoFn.ElementParam:
-        final_args.append(ArgPlaceholder(d))
-      elif d == core.DoFn.ContextParam:
-        final_args.append(ArgPlaceholder(d))
-      elif d == core.DoFn.WindowParam:
-        final_args.append(ArgPlaceholder(d))
-      elif d == core.DoFn.TimestampParam:
-        final_args.append(ArgPlaceholder(d))
-      elif d == core.DoFn.SideInputParam:
-        # If no more args are present then the value must be passed via kwarg
-        try:
-          final_args.append(
-        except StopIteration:
-          if a not in self.kwargs:
-            raise ValueError("Value for sideinput %s not provided" % a)
-      else:
-        # If no more args are present then the value must be passed via kwarg
-        try:
-          final_args.append(
-        except StopIteration:
-          pass
-    final_args.extend(list(args))
-    self.args = final_args
+    # Optimize for the common case.
+    main_receivers = as_receiver(tagged_receivers[None])
+    output_processor = OutputProcessor(
+        windowing.windowfn, main_receivers, tagged_receivers)
-    # Stash the list of placeholder positions for performance
-    self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args)
-                         if isinstance(x, ArgPlaceholder)]
+    self.do_fn_invoker = DoFnInvoker.create_invoker(
+        output_processor, do_fn_signature, context, side_inputs, args, kwargs)
   def receive(self, windowed_value):
-  def _dofn_simple_invoker(self, element):
-    self._process_outputs(element, self.dofn_process(element.value))
-  def _dofn_per_window_invoker(self, element):
-    if self.has_windowed_inputs:
-      window, =
-      args, kwargs = util.insert_values_in_args(
-          self.args, self.kwargs, [si[window] for si in self.side_inputs])
-    else:
-      args, kwargs = self.args, self.kwargs
-    # TODO(sourabhbajaj): Investigate why we can't use `is` instead of ==
-    for i, p in self.placeholders:
-      if p == core.DoFn.ElementParam:
-        args[i] = element.value
-      elif p == core.DoFn.ContextParam:
-        args[i] = self.context
-      elif p == core.DoFn.WindowParam:
-        args[i] = window
-      elif p == core.DoFn.TimestampParam:
-        args[i] = element.timestamp
-    if not kwargs:
-      self._process_outputs(element, self.dofn_process(*args))
-    else:
-      self._process_outputs(element, self.dofn_process(*args, **kwargs))
-  def _dofn_invoker(self, element):
-    self.context.set_element(element)
-    # Call for the process function for each window if has windowed side inputs
-    # or if the process accesses the window parameter. We can just call it once
-    # otherwise as none of the arguments are changing
-    if self.has_windowed_inputs and len( != 1:
-      for w in
-        self._dofn_per_window_invoker(
-            WindowedValue(element.value, element.timestamp, (w,)))
-    else:
-      self._dofn_per_window_invoker(element)
-  def _invoke_bundle_method(self, method):
+  def process(self, windowed_value):
-      self.context.set_element(None)
-      f = getattr(self.dofn, method)
-      _, _, _, defaults = self.dofn.get_function_arguments(method)
-      defaults = defaults if defaults else []
-      args = [self.context if d == core.DoFn.ContextParam else d
-              for d in defaults]
-      self._process_outputs(None, f(*args))
+      self.do_fn_invoker.invoke_process(windowed_value)
     except BaseException as exn:
-      self.reraise_augmented(exn)
+      self._reraise_augmented(exn)
-  def start(self):
-    self._invoke_bundle_method('start_bundle')
-  def finish(self):
-    self._invoke_bundle_method('finish_bundle')
-  def process(self, element):
+  def _invoke_bundle_method(self, bundle_method):
-      if self.use_simple_invoker:
-        self._dofn_simple_invoker(element)
-      else:
-        self._dofn_invoker(element)
+      self.context.set_element(None)
+      bundle_method()
     except BaseException as exn:
-      self.reraise_augmented(exn)
+      self._reraise_augmented(exn)
-  def reraise_augmented(self, exn):
+  def start(self):
+    self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
+  def finish(self):
+    self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
+  def _reraise_augmented(self, exn):
     if getattr(exn, '_tagged_with_step', False) or not self.step_name:
     args = exn.args
@@ -280,7 +419,23 @@ class DoFnRunner(Receiver):
-  def _process_outputs(self, windowed_input_element, results):
+class OutputProcessor(object):
+  """Processes output produced by DoFn method invocations."""
+  def __init__(self, window_fn, main_receivers, tagged_receivers):
+    """Initializes ``OutputProcessor``.
+    Args:
+      window_fn: a windowing function (WindowFn).
+      main_receivers: a dict of tag name to Receiver objects.
+      tagged_receivers: main receiver object.
+    """
+    self.window_fn = window_fn
+    self.main_receivers = main_receivers
+    self.tagged_receivers = tagged_receivers
+  def process_outputs(self, windowed_input_element, results):
     """Dispatch the result of computation to the appropriate receivers.
     A value wrapped in a OutputValue object will be unwrapped and

[2/2] beam git commit: This closes #2519

Posted by
This closes #2519


Branch: refs/heads/master
Commit: 00946997229bf8a764ce0a135b2112bacd7a3993
Parents: afb96d7 7db375d
Author: <>
Authored: Tue Apr 25 23:31:35 2017 -0700
Committer: <>
Committed: Tue Apr 25 23:31:35 2017 -0700

 sdks/python/apache_beam/runners/common.pxd |  62 +++-
 sdks/python/apache_beam/runners/  | 441 ++++++++++++++++--------
 2 files changed, 344 insertions(+), 159 deletions(-)