You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/10/06 00:15:44 UTC

[1/2] beam git commit: Adds API for defining Splittable DoFns.

Repository: beam
Updated Branches:
  refs/heads/master 2090ee324 -> 31da49cc1


Adds API for defining Splittable DoFns.

See https://s.apache.org/splittable-do-fn-python-sdk for the design.

This PR and the above doc were updated to reflect following recent updates to Splittable DoFn.
* Support for ProcessContinuations
* Support for dynamically updating output watermark irrespective of the output element production.

This will be followed by a PR that adds support for reading Splittable DoFns using DirectRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1a70a36
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1a70a36
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1a70a36

Branch: refs/heads/master
Commit: d1a70a36cabd2f32ef57b99dc33877826d83cafd
Parents: 2090ee3
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Sep 21 17:43:11 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Oct 5 17:14:56 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py       |  72 ++++++++++++
 sdks/python/apache_beam/transforms/core.py | 143 +++++++++++++++++++++++-
 2 files changed, 212 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d1a70a36/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 1f2a8bf..7cffa7f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1013,3 +1013,75 @@ class _RoundRobinKeyFn(core.DoFn):
     if self.counter >= self.count:
       self.counter -= self.count
     yield self.counter, element
+
+
+class RestrictionTracker(object):
+  """Manages concurrent access to a restriction.
+
+  Experimental; no backwards-compatibility guarantees.
+
+  Keeps track of the restrictions claimed part for a Splittable DoFn.
+
+  See following documents for more details.
+  * https://s.apache.org/splittable-do-fn
+  * https://s.apache.org/splittable-do-fn-python-sdk
+  """
+
+  def current_restriction(self):
+    """Returns the current restriction.
+
+    Returns a restriction accurately describing the full range of work the
+    current ``DoFn.process()`` call will do, including already completed work.
+
+    The current restriction returned by method may be updated dynamically due
+    to due to concurrent invocation of other methods of the
+    ``RestrictionTracker``, For example, ``checkpoint()``.
+
+    ** Thread safety **
+
+    Methods of the class ``RestrictionTracker`` including this method may get
+    invoked by different threads, hence must be made thread-safe, e.g. by using
+    a single lock object.
+    """
+    raise NotImplementedError
+
+  def checkpoint(self):
+    """Performs a checkpoint of the current restriction.
+
+    Signals that the current ``DoFn.process()`` call should terminate as soon as
+    possible. After this method returns, the tracker MUST refuse all future
+    claim calls, and ``RestrictionTracker.check_done()`` MUST succeed.
+
+    This invocation modifies the value returned by ``current_restriction()``
+    invocation and returns a restriction representing the rest of the work. The
+    old value of ``current_restriction()`` is equivalent to the new value of
+    ``current_restriction()`` and the return value of this method invocation
+    combined.
+
+    ** Thread safety **
+
+    Methods of the class ``RestrictionTracker`` including this method may get
+    invoked by different threads, hence must be made thread-safe, e.g. by using
+    a single lock object.
+    """
+
+    raise NotImplementedError
+
+  def check_done(self):
+    """Checks whether the restriction has been fully processed.
+
+    Called by the runner after iterator returned by ``DoFn.process()`` has been
+    fully read.
+
+    Returns: ``True`` if current restriction has been fully processed.
+    Raises ValueError: if there is still any unclaimed work remaining in the
+      restriction invoking this method. Exception raised must have an
+      informative error message.
+
+    ** Thread safety **
+
+    Methods of the class ``RestrictionTracker`` including this method may get
+    invoked by different threads, hence must be made thread-safe, e.g. by using
+    a single lock object.
+    """
+    raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/d1a70a36/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 153dc32..41e20ba 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -89,6 +89,8 @@ class DoFnContext(object):
 class DoFnProcessContext(DoFnContext):
   """A processing context passed to DoFn process() during execution.
 
+  Experimental; no backwards-compatibility guarantees.
+
   Most importantly, a DoFn.process method will access context.element
   to get the element it is supposed to process.
 
@@ -137,6 +139,126 @@ class DoFnProcessContext(DoFnContext):
       self.windows = windowed_value.windows
 
 
+class ProcessContinuation(object):
+  """An object that may be produced as the last element of a process method
+    invocation.
+
+  Experimental; no backwards-compatibility guarantees.
+
+  If produced, indicates that there is more work to be done for the current
+  input element.
+  """
+
+  def __init__(self, resume_delay=0):
+    """Initializes a ProcessContinuation object.
+
+    Args:
+      resume_delay: indicates the minimum time, in seconds, that should elapse
+        before re-invoking process() method for resuming the invocation of the
+        current element.
+    """
+    self.resume_delay = resume_delay
+
+  @staticmethod
+  def resume(resume_delay=0):
+    """A convenient method that produces a ``ProcessContinuation``.
+
+    Args:
+      resume_delay: delay after which processing current element should be
+        resumed.
+    Returns: a ``ProcessContinuation`` for signalling the runner that current
+      input element has not been fully processed and should be resumed later.
+    """
+    return ProcessContinuation(resume_delay=resume_delay)
+
+
+class RestrictionProvider(object):
+  """Provides methods for generating and manipulating restrictions.
+
+  This class should be implemented to support Splittable ``DoFn``s in Python
+  SDK. See https://s.apache.org/splittable-do-fn for more details about
+  Splittable ``DoFn``s.
+
+  To denote a ``DoFn`` class to be Splittable ``DoFn``, ``DoFn.process()``
+  method of that class should have exactly one parameter whose default value is
+  an instance of ``RestrictionProvider``.
+
+  The provided ``RestrictionProvider`` instance must provide suitable overrides
+  for the following methods.
+  * create_tracker()
+  * initial_restriction()
+
+  Optionally, ``RestrictionProvider`` may override default implementations of
+  following methods.
+  * restriction_coder()
+  * split()
+
+  ** Pausing and resuming processing of an element **
+
+  As the last element produced by the iterator returned by the
+  ``DoFn.process()`` method, a Splittable ``DoFn`` may return an object of type
+  ``ProcessContinuation``.
+
+  If provided, ``ProcessContinuation`` object specifies that runner should
+  later re-invoke ``DoFn.process()`` method to resume processing the current
+  element and the manner in which the re-invocation should be performed. A
+  ``ProcessContinuation`` object must only be specified as the last element of
+  the iterator. If a ``ProcessContinuation`` object is not provided the runner
+  will assume that the current input element has been fully processed.
+
+  ** Updating output watermark **
+
+  ``DoFn.process()`` method of Splittable ``DoFn``s could contain a parameter
+  with default value ``DoFn.WatermarkReporterParam``. If specified this asks the
+  runner to provide a function that can be used to give the runner a
+  (best-effort) lower bound about the timestamps of future output associated
+  with the current element processed by the ``DoFn``. If the ``DoFn`` has
+  multiple outputs, the watermark applies to all of them. Provided function must
+  be invoked with a single parameter of type ``Timestamp`` or as an integer that
+  gives the watermark in number of seconds.
+  """
+
+  def create_tracker(self, restriction):
+    """Produces a new ``RestrictionTracker`` for the given restriction.
+
+    Args:
+      restriction: an object that defines a restriction as identified by a
+        Splittable ``DoFn`` that utilizes the current ``RestrictionProvider``.
+        For example, a tuple that gives a range of positions for a Splittable
+        ``DoFn`` that reads files based on byte positions.
+    Returns: an object of type ``RestrictionTracker``.
+    """
+    raise NotImplementedError
+
+  def initial_restriction(self, element):
+    """Produces an initial restriction for the given element."""
+    raise NotImplementedError
+
+  def split(self, element, restriction):
+    """Splits the given element and restriction.
+
+    Returns an iterator of restrictions. The total set of elements produced by
+    reading input element for each of the returned restrictions should be the
+    same as the total set of elements produced by reading the input element for
+    the input restriction.
+
+    TODO(chamikara): give suitable hints for performing splitting, for example
+    number of parts or size in bytes.
+    """
+    yield restriction
+
+  def restriction_coder(self):
+    """Returns a ``Coder`` for restrictions.
+
+    Returned``Coder`` will be used for the restrictions produced by the current
+    ``RestrictionProvider``.
+
+    Returns:
+      an object of type ``Coder``.
+    """
+    return coders.registry.get_coder(object)
+
+
 class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   """A function object used by a transform with custom processing.
 
@@ -153,6 +275,7 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   SideInputParam = 'SideInputParam'
   TimestampParam = 'TimestampParam'
   WindowParam = 'WindowParam'
+  WatermarkReporterParam = 'WatermarkReporterParam'
 
   DoFnParams = [ElementParam, SideInputParam, TimestampParam, WindowParam]
 
@@ -164,13 +287,27 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     return self.__class__.__name__
 
   def process(self, element, *args, **kwargs):
-    """Called for each element of a pipeline. The default arguments are needed
-    for the DoFnRunner to be able to pass the parameters correctly.
+    """Method to use for processing elements.
+
+    This is invoked by ``DoFnRunner`` for each element of a input
+    ``PCollection``.
+
+    If specified, following default arguments are used by the ``DoFnRunner`` to
+    be able to pass the parameters correctly.
+
+    ``DoFn.ElementParam``: element to be processed.
+    ``DoFn.SideInputParam``: a side input that may be used when processing.
+    ``DoFn.TimestampParam``: timestamp of the input element.
+    ``DoFn.WindowParam``: ``Window`` the input element belongs to.
+    A ``RestrictionProvider`` instance: an ``iobase.RestrictionTracker`` will be
+    provided here to allow treatment as a Splittable `DoFn``.
+    ``DoFn.WatermarkReporterParam``: a function that can be used to report
+    output watermark of Splittable ``DoFn`` implementations.
 
     Args:
       element: The element to be processed
       *args: side inputs
-      **kwargs: keyword side inputs
+      **kwargs: other keyword arguments.
     """
     raise NotImplementedError
 


[2/2] beam git commit: This closes #3882

Posted by ch...@apache.org.
This closes #3882


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31da49cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31da49cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31da49cc

Branch: refs/heads/master
Commit: 31da49cc15c9e863d20f3a2e1753ded9b783ff80
Parents: 2090ee3 d1a70a3
Author: Chamikara Jayalath <ch...@google.com>
Authored: Thu Oct 5 17:15:30 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Oct 5 17:15:30 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/iobase.py       |  72 ++++++++++++
 sdks/python/apache_beam/transforms/core.py | 143 +++++++++++++++++++++++-
 2 files changed, 212 insertions(+), 3 deletions(-)
----------------------------------------------------------------------