You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/10/06 00:15:44 UTC
[1/2] beam git commit: Adds API for defining Splittable DoFns.
Repository: beam
Updated Branches:
refs/heads/master 2090ee324 -> 31da49cc1
Adds API for defining Splittable DoFns.
See https://s.apache.org/splittable-do-fn-python-sdk for the design.
This PR and the above doc were updated to reflect following recent updates to Splittable DoFn.
* Support for ProcessContinuations
* Support for dynamically updating output watermark irrespective of the output element production.
This will be followed by a PR that adds support for reading Splittable DoFns using DirectRunner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1a70a36
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1a70a36
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1a70a36
Branch: refs/heads/master
Commit: d1a70a36cabd2f32ef57b99dc33877826d83cafd
Parents: 2090ee3
Author: chamikara@google.com <ch...@google.com>
Authored: Thu Sep 21 17:43:11 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Oct 5 17:14:56 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 72 ++++++++++++
sdks/python/apache_beam/transforms/core.py | 143 +++++++++++++++++++++++-
2 files changed, 212 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d1a70a36/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 1f2a8bf..7cffa7f 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1013,3 +1013,75 @@ class _RoundRobinKeyFn(core.DoFn):
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, element
+
+
+class RestrictionTracker(object):
+ """Manages concurrent access to a restriction.
+
+ Experimental; no backwards-compatibility guarantees.
+
+ Keeps track of the restrictions claimed part for a Splittable DoFn.
+
+ See following documents for more details.
+ * https://s.apache.org/splittable-do-fn
+ * https://s.apache.org/splittable-do-fn-python-sdk
+ """
+
+ def current_restriction(self):
+ """Returns the current restriction.
+
+ Returns a restriction accurately describing the full range of work the
+ current ``DoFn.process()`` call will do, including already completed work.
+
+ The current restriction returned by method may be updated dynamically due
+ to due to concurrent invocation of other methods of the
+ ``RestrictionTracker``, For example, ``checkpoint()``.
+
+ ** Thread safety **
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
+ """
+ raise NotImplementedError
+
+ def checkpoint(self):
+ """Performs a checkpoint of the current restriction.
+
+ Signals that the current ``DoFn.process()`` call should terminate as soon as
+ possible. After this method returns, the tracker MUST refuse all future
+ claim calls, and ``RestrictionTracker.check_done()`` MUST succeed.
+
+ This invocation modifies the value returned by ``current_restriction()``
+ invocation and returns a restriction representing the rest of the work. The
+ old value of ``current_restriction()`` is equivalent to the new value of
+ ``current_restriction()`` and the return value of this method invocation
+ combined.
+
+ ** Thread safety **
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
+ """
+
+ raise NotImplementedError
+
+ def check_done(self):
+ """Checks whether the restriction has been fully processed.
+
+ Called by the runner after iterator returned by ``DoFn.process()`` has been
+ fully read.
+
+ Returns: ``True`` if current restriction has been fully processed.
+ Raises ValueError: if there is still any unclaimed work remaining in the
+ restriction invoking this method. Exception raised must have an
+ informative error message.
+
+ ** Thread safety **
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
+ """
+ raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/d1a70a36/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 153dc32..41e20ba 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -89,6 +89,8 @@ class DoFnContext(object):
class DoFnProcessContext(DoFnContext):
"""A processing context passed to DoFn process() during execution.
+ Experimental; no backwards-compatibility guarantees.
+
Most importantly, a DoFn.process method will access context.element
to get the element it is supposed to process.
@@ -137,6 +139,126 @@ class DoFnProcessContext(DoFnContext):
self.windows = windowed_value.windows
+class ProcessContinuation(object):
+ """An object that may be produced as the last element of a process method
+ invocation.
+
+ Experimental; no backwards-compatibility guarantees.
+
+ If produced, indicates that there is more work to be done for the current
+ input element.
+ """
+
+ def __init__(self, resume_delay=0):
+ """Initializes a ProcessContinuation object.
+
+ Args:
+ resume_delay: indicates the minimum time, in seconds, that should elapse
+ before re-invoking process() method for resuming the invocation of the
+ current element.
+ """
+ self.resume_delay = resume_delay
+
+ @staticmethod
+ def resume(resume_delay=0):
+ """A convenient method that produces a ``ProcessContinuation``.
+
+ Args:
+ resume_delay: delay after which processing current element should be
+ resumed.
+ Returns: a ``ProcessContinuation`` for signalling the runner that current
+ input element has not been fully processed and should be resumed later.
+ """
+ return ProcessContinuation(resume_delay=resume_delay)
+
+
+class RestrictionProvider(object):
+ """Provides methods for generating and manipulating restrictions.
+
+ This class should be implemented to support Splittable ``DoFn``s in Python
+ SDK. See https://s.apache.org/splittable-do-fn for more details about
+ Splittable ``DoFn``s.
+
+ To denote a ``DoFn`` class to be Splittable ``DoFn``, ``DoFn.process()``
+ method of that class should have exactly one parameter whose default value is
+ an instance of ``RestrictionProvider``.
+
+ The provided ``RestrictionProvider`` instance must provide suitable overrides
+ for the following methods.
+ * create_tracker()
+ * initial_restriction()
+
+ Optionally, ``RestrictionProvider`` may override default implementations of
+ following methods.
+ * restriction_coder()
+ * split()
+
+ ** Pausing and resuming processing of an element **
+
+ As the last element produced by the iterator returned by the
+ ``DoFn.process()`` method, a Splittable ``DoFn`` may return an object of type
+ ``ProcessContinuation``.
+
+ If provided, ``ProcessContinuation`` object specifies that runner should
+ later re-invoke ``DoFn.process()`` method to resume processing the current
+ element and the manner in which the re-invocation should be performed. A
+ ``ProcessContinuation`` object must only be specified as the last element of
+ the iterator. If a ``ProcessContinuation`` object is not provided the runner
+ will assume that the current input element has been fully processed.
+
+ ** Updating output watermark **
+
+ ``DoFn.process()`` method of Splittable ``DoFn``s could contain a parameter
+ with default value ``DoFn.WatermarkReporterParam``. If specified this asks the
+ runner to provide a function that can be used to give the runner a
+ (best-effort) lower bound about the timestamps of future output associated
+ with the current element processed by the ``DoFn``. If the ``DoFn`` has
+ multiple outputs, the watermark applies to all of them. Provided function must
+ be invoked with a single parameter of type ``Timestamp`` or as an integer that
+ gives the watermark in number of seconds.
+ """
+
+ def create_tracker(self, restriction):
+ """Produces a new ``RestrictionTracker`` for the given restriction.
+
+ Args:
+ restriction: an object that defines a restriction as identified by a
+ Splittable ``DoFn`` that utilizes the current ``RestrictionProvider``.
+ For example, a tuple that gives a range of positions for a Splittable
+ ``DoFn`` that reads files based on byte positions.
+ Returns: an object of type ``RestrictionTracker``.
+ """
+ raise NotImplementedError
+
+ def initial_restriction(self, element):
+ """Produces an initial restriction for the given element."""
+ raise NotImplementedError
+
+ def split(self, element, restriction):
+ """Splits the given element and restriction.
+
+ Returns an iterator of restrictions. The total set of elements produced by
+ reading input element for each of the returned restrictions should be the
+ same as the total set of elements produced by reading the input element for
+ the input restriction.
+
+ TODO(chamikara): give suitable hints for performing splitting, for example
+ number of parts or size in bytes.
+ """
+ yield restriction
+
+ def restriction_coder(self):
+ """Returns a ``Coder`` for restrictions.
+
+ Returned``Coder`` will be used for the restrictions produced by the current
+ ``RestrictionProvider``.
+
+ Returns:
+ an object of type ``Coder``.
+ """
+ return coders.registry.get_coder(object)
+
+
class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""A function object used by a transform with custom processing.
@@ -153,6 +275,7 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
SideInputParam = 'SideInputParam'
TimestampParam = 'TimestampParam'
WindowParam = 'WindowParam'
+ WatermarkReporterParam = 'WatermarkReporterParam'
DoFnParams = [ElementParam, SideInputParam, TimestampParam, WindowParam]
@@ -164,13 +287,27 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
return self.__class__.__name__
def process(self, element, *args, **kwargs):
- """Called for each element of a pipeline. The default arguments are needed
- for the DoFnRunner to be able to pass the parameters correctly.
+ """Method to use for processing elements.
+
+ This is invoked by ``DoFnRunner`` for each element of a input
+ ``PCollection``.
+
+ If specified, following default arguments are used by the ``DoFnRunner`` to
+ be able to pass the parameters correctly.
+
+ ``DoFn.ElementParam``: element to be processed.
+ ``DoFn.SideInputParam``: a side input that may be used when processing.
+ ``DoFn.TimestampParam``: timestamp of the input element.
+ ``DoFn.WindowParam``: ``Window`` the input element belongs to.
+ A ``RestrictionProvider`` instance: an ``iobase.RestrictionTracker`` will be
+ provided here to allow treatment as a Splittable `DoFn``.
+ ``DoFn.WatermarkReporterParam``: a function that can be used to report
+ output watermark of Splittable ``DoFn`` implementations.
Args:
element: The element to be processed
*args: side inputs
- **kwargs: keyword side inputs
+ **kwargs: other keyword arguments.
"""
raise NotImplementedError
[2/2] beam git commit: This closes #3882
Posted by ch...@apache.org.
This closes #3882
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31da49cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31da49cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31da49cc
Branch: refs/heads/master
Commit: 31da49cc15c9e863d20f3a2e1753ded9b783ff80
Parents: 2090ee3 d1a70a3
Author: Chamikara Jayalath <ch...@google.com>
Authored: Thu Oct 5 17:15:30 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Oct 5 17:15:30 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/iobase.py | 72 ++++++++++++
sdks/python/apache_beam/transforms/core.py | 143 +++++++++++++++++++++++-
2 files changed, 212 insertions(+), 3 deletions(-)
----------------------------------------------------------------------