You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/01/29 22:43:36 UTC

[beam] branch master updated: [BEAM-3537] Allow more general eager in-process pipeline execution (#4492)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d920644  [BEAM-3537] Allow more general eager in-process pipeline execution (#4492)
d920644 is described below

commit d9206441a16fa39e171684c43ec724cdd80a6ca1
Author: Charles Chen <ch...@users.noreply.github.com>
AuthorDate: Mon Jan 29 14:43:32 2018 -0800

    [BEAM-3537] Allow more general eager in-process pipeline execution (#4492)
    
    [BEAM-3537] Allow more general eager in-process pipeline execution
    
    This change also removes the Python DirectRunner-specific PValue cache.
---
 .../apache_beam/examples/snippets/snippets_test.py |  31 +++---
 sdks/python/apache_beam/runners/__init__.py        |   1 -
 .../apache_beam/runners/direct/direct_runner.py    |  55 ----------
 .../runners/direct/evaluation_context.py           |  14 ---
 sdks/python/apache_beam/runners/direct/executor.py |  11 --
 sdks/python/apache_beam/runners/runner.py          |   2 +-
 sdks/python/apache_beam/transforms/ptransform.py   | 119 ++++++++++++++++++---
 .../apache_beam/transforms/ptransform_test.py      |  16 ++-
 8 files changed, 136 insertions(+), 113 deletions(-)

diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f05dc39..e731236 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -223,24 +223,27 @@ class ParDoTest(unittest.TestCase):
     self.assertEqual({'xyz'}, set(marked))
 
   def test_pardo_with_undeclared_outputs(self):
-    numbers = [1, 2, 3, 4, 5, 10, 20]
+    # Note: the use of undeclared outputs is currently not supported in eager
+    # execution mode.
+    with TestPipeline() as p:
+      numbers = p | beam.Create([1, 2, 3, 4, 5, 10, 20])
 
-    # [START model_pardo_with_undeclared_outputs]
-    def even_odd(x):
-      yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
-      if x % 10 == 0:
-        yield x
+      # [START model_pardo_with_undeclared_outputs]
+      def even_odd(x):
+        yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
+        if x % 10 == 0:
+          yield x
 
-    results = numbers | beam.FlatMap(even_odd).with_outputs()
+      results = numbers | beam.FlatMap(even_odd).with_outputs()
 
-    evens = results.even
-    odds = results.odd
-    tens = results[None]  # the undeclared main output
-    # [END model_pardo_with_undeclared_outputs]
+      evens = results.even
+      odds = results.odd
+      tens = results[None]  # the undeclared main output
+      # [END model_pardo_with_undeclared_outputs]
 
-    self.assertEqual({2, 4, 10, 20}, set(evens))
-    self.assertEqual({1, 3, 5}, set(odds))
-    self.assertEqual({10, 20}, set(tens))
+      assert_that(evens, equal_to([2, 4, 10, 20]), label='assert_even')
+      assert_that(odds, equal_to([1, 3, 5]), label='assert_odds')
+      assert_that(tens, equal_to([10, 20]), label='assert_tens')
 
 
 class TypeHintsTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index 2b93c30..863e67e 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -21,7 +21,6 @@ This package defines runners, which are used to execute a pipeline.
 """
 
 from apache_beam.runners.direct.direct_runner import DirectRunner
-from apache_beam.runners.direct.direct_runner import EagerRunner
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import create_runner
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2bd6b45..b18d492 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -23,7 +23,6 @@ graph of transformations belonging to a pipeline on the local machine.
 
 from __future__ import absolute_import
 
-import collections
 import logging
 
 from google.protobuf import wrappers_pb2
@@ -41,7 +40,6 @@ from apache_beam.runners.direct.clock import TestClock
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
 from apache_beam.transforms.core import _GroupAlsoByWindow
 from apache_beam.transforms.core import _GroupByKeyOnly
 from apache_beam.transforms.ptransform import PTransform
@@ -106,7 +104,6 @@ class DirectRunner(PipelineRunner):
   """Executes a single pipeline on the local machine."""
 
   def __init__(self):
-    self._cache = None
     self._use_test_clock = False  # use RealClock() in production
     self._ptransform_overrides = _get_transform_overrides()
 
@@ -229,8 +226,6 @@ class DirectRunner(PipelineRunner):
         self.consumer_tracking_visitor.views,
         clock)
 
-    evaluation_context.use_pvalue_cache(self._cache)
-
     executor = Executor(self.consumer_tracking_visitor.value_to_consumers,
                         TransformEvaluatorRegistry(evaluation_context),
                         evaluation_context)
@@ -242,53 +237,8 @@ class DirectRunner(PipelineRunner):
     executor.start(self.consumer_tracking_visitor.root_transforms)
     result = DirectPipelineResult(executor, evaluation_context)
 
-    if self._cache:
-      # We are running in eager mode, block until the pipeline execution
-      # completes in order to have full results in the cache.
-      result.wait_until_finish()
-      self._cache.finalize()
-
     return result
 
-  @property
-  def cache(self):
-    if not self._cache:
-      self._cache = BufferingInMemoryCache()
-    return self._cache.pvalue_cache
-
-
-class BufferingInMemoryCache(object):
-  """PValueCache wrapper for buffering bundles until a PValue is fully computed.
-
-  BufferingInMemoryCache keeps an in memory cache of
-  (applied_ptransform, tag) tuples. It accepts appending to existing cache
-  entries until it is finalized. finalize() will make all the existing cached
-  entries visible to the underyling PValueCache in their entirety, clean the in
-  memory cache and stop accepting new cache entries.
-  """
-
-  def __init__(self):
-    self._cache = collections.defaultdict(list)
-    self._pvalue_cache = PValueCache()
-    self._finalized = False
-
-  @property
-  def pvalue_cache(self):
-    return self._pvalue_cache
-
-  def append(self, applied_ptransform, tag, elements):
-    assert not self._finalized
-    assert elements is not None
-    self._cache[(applied_ptransform, tag)].extend(elements)
-
-  def finalize(self):
-    """Make buffered cache elements visible to the underlying PValueCache."""
-    assert not self._finalized
-    for key, value in self._cache.iteritems():
-      applied_ptransform, tag = key
-      self._pvalue_cache.cache_output(applied_ptransform, tag, value)
-    self._cache = None
-
 
 class DirectPipelineResult(PipelineResult):
   """A DirectPipelineResult provides access to info about a pipeline."""
@@ -329,8 +279,3 @@ class DirectPipelineResult(PipelineResult):
 
   def metrics(self):
     return self._evaluation_context.metrics()
-
-
-class EagerRunner(DirectRunner):
-
-  is_eager = True
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 718dafa..46176c9 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -155,7 +155,6 @@ class EvaluationContext(object):
     self._side_inputs_container = _SideInputsContainer(views)
     self._pending_unblocked_tasks = []
     self._counter_factory = counters.CounterFactory()
-    self._cache = None
     self._metrics = DirectMetrics()
 
     self._lock = threading.Lock()
@@ -169,23 +168,10 @@ class EvaluationContext(object):
         transform_keyed_states[consumer] = {}
     return transform_keyed_states
 
-  def use_pvalue_cache(self, cache):
-    assert not self._cache
-    self._cache = cache
-
   def metrics(self):
     # TODO. Should this be made a @property?
     return self._metrics
 
-  @property
-  def has_cache(self):
-    return self._cache is not None
-
-  def append_to_cache(self, applied_ptransform, tag, elements):
-    with self._lock:
-      assert self._cache
-      self._cache.append(applied_ptransform, tag, elements)
-
   def is_root_transform(self, applied_ptransform):
     return applied_ptransform in self._root_transforms
 
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 1cbabc4..d4d9cb5 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -341,17 +341,6 @@ class TransformExecutor(_ExecutorService.CallableTask):
       result = evaluator.finish_bundle()
       result.logical_metric_updates = metrics_container.get_cumulative()
 
-    if self._evaluation_context.has_cache:
-      for uncommitted_bundle in result.uncommitted_output_bundles:
-        self._evaluation_context.append_to_cache(
-            self._applied_ptransform, uncommitted_bundle.tag,
-            uncommitted_bundle.get_elements_iterable())
-      undeclared_tag_values = result.undeclared_tag_values
-      if undeclared_tag_values:
-        for tag, value in undeclared_tag_values.iteritems():
-          self._evaluation_context.append_to_cache(
-              self._applied_ptransform, tag, value)
-
     self._completion_callback.handle_result(self, self._input_bundle, result)
     return result
 
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 78ae4d8..1d0f700 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -45,7 +45,7 @@ _PYTHON_RPC_DIRECT_RUNNER = (
     'python_rpc_direct_runner.')
 
 _KNOWN_PYTHON_RPC_DIRECT_RUNNER = ('PythonRPCDirectRunner',)
-_KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
+_KNOWN_DIRECT_RUNNERS = ('DirectRunner',)
 _KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',)
 _KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
 
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 0b6d608..2490495 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -38,9 +38,11 @@ from __future__ import absolute_import
 
 import copy
 import inspect
+import itertools
 import operator
 import os
 import sys
+import threading
 from functools import reduce
 
 from google.protobuf import wrappers_pb2
@@ -97,29 +99,116 @@ class _SetInputPValues(_PValueishTransform):
       return self.visit_nested(node, replacements)
 
 
+# Caches to allow for materialization of values when executing a pipeline
+# in-process, in eager mode.  This cache allows the same _MaterializedResult
+# object to be accessed and used despite Runner API round-trip serialization.
+_pipeline_materialization_cache = {}
+_pipeline_materialization_lock = threading.Lock()
+
+
+def _allocate_materialized_pipeline(pipeline):
+  pid = os.getpid()
+  with _pipeline_materialization_lock:
+    pipeline_id = id(pipeline)
+    _pipeline_materialization_cache[(pid, pipeline_id)] = {}
+
+
+def _allocate_materialized_result(pipeline):
+  pid = os.getpid()
+  with _pipeline_materialization_lock:
+    pipeline_id = id(pipeline)
+    if (pid, pipeline_id) not in _pipeline_materialization_cache:
+      raise ValueError('Materialized pipeline is not allocated for result '
+                       'cache.')
+    result_id = len(_pipeline_materialization_cache[(pid, pipeline_id)])
+    result = _MaterializedResult(pipeline_id, result_id)
+    _pipeline_materialization_cache[(pid, pipeline_id)][result_id] = result
+    return result
+
+
+def _get_materialized_result(pipeline_id, result_id):
+  pid = os.getpid()
+  with _pipeline_materialization_lock:
+    if (pid, pipeline_id) not in _pipeline_materialization_cache:
+      raise Exception(
+          'Materialization in out-of-process and remote runners is not yet '
+          'supported.')
+    return _pipeline_materialization_cache[(pid, pipeline_id)][result_id]
+
+
+def _release_materialized_pipeline(pipeline):
+  pid = os.getpid()
+  with _pipeline_materialization_lock:
+    pipeline_id = id(pipeline)
+    del _pipeline_materialization_cache[(pid, pipeline_id)]
+
+
+class _MaterializedResult(object):
+  def __init__(self, pipeline_id, result_id):
+    self._pipeline_id = pipeline_id
+    self._result_id = result_id
+    self.elements = []
+
+  def __reduce__(self):
+    # When unpickled (during Runner API roundtrip serailization), get the
+    # _MaterializedResult object from the cache so that values are written
+    # to the original _MaterializedResult when run in eager mode.
+    return (_get_materialized_result, (self._pipeline_id, self._result_id))
+
+
 class _MaterializedDoOutputsTuple(pvalue.DoOutputsTuple):
-  def __init__(self, deferred, pvalue_cache):
+  def __init__(self, deferred, results_by_tag):
     super(_MaterializedDoOutputsTuple, self).__init__(
         None, None, deferred._tags, deferred._main_tag)
     self._deferred = deferred
-    self._pvalue_cache = pvalue_cache
+    self._results_by_tag = results_by_tag
 
   def __getitem__(self, tag):
-    # Simply accessing the value should not use it up.
-    return self._pvalue_cache.get_unwindowed_pvalue(
-        self._deferred[tag], decref=False)
+    if tag not in self._results_by_tag:
+      raise KeyError(
+          'Tag %r is not a a defined output tag of %s.' % (
+              tag, self._deferred))
+    return self._results_by_tag[tag].elements
+
+
+class _AddMaterializationTransforms(_PValueishTransform):
 
+  def _materialize_transform(self, pipeline):
+    result = _allocate_materialized_result(pipeline)
 
-class _MaterializePValues(_PValueishTransform):
-  def __init__(self, pvalue_cache):
-    self._pvalue_cache = pvalue_cache
+    # Need to define _MaterializeValuesDoFn here to avoid circular
+    # dependencies.
+    from apache_beam import DoFn
+    from apache_beam import ParDo
+
+    class _MaterializeValuesDoFn(DoFn):
+      def process(self, element):
+        result.elements.append(element)
+
+    materialization_label = '_MaterializeValues%d' % result._result_id
+    return (materialization_label >> ParDo(_MaterializeValuesDoFn()),
+            result)
 
   def visit(self, node):
     if isinstance(node, pvalue.PValue):
-      # Simply accessing the value should not use it up.
-      return self._pvalue_cache.get_unwindowed_pvalue(node, decref=False)
+      transform, result = self._materialize_transform(node.pipeline)
+      node | transform
+      return result
     elif isinstance(node, pvalue.DoOutputsTuple):
-      return _MaterializedDoOutputsTuple(node, self._pvalue_cache)
+      results_by_tag = {}
+      for tag in itertools.chain([node._main_tag], node._tags):
+        results_by_tag[tag] = self.visit(node[tag])
+      return _MaterializedDoOutputsTuple(node, results_by_tag)
+    else:
+      return self.visit_nested(node)
+
+
+class _FinalizeMaterialization(_PValueishTransform):
+  def visit(self, node):
+    if isinstance(node, _MaterializedResult):
+      return node.elements
+    elif isinstance(node, _MaterializedDoOutputsTuple):
+      return node
     else:
       return self.visit_nested(node)
 
@@ -399,11 +488,11 @@ class PTransform(WithTypeHints, HasDisplayData):
     result = p.apply(self, pvalueish, label)
     if deferred:
       return result
-    # Get a reference to the runners internal cache, otherwise runner may
-    # clean it after run.
-    cache = p.runner.cache
+    _allocate_materialized_pipeline(p)
+    materialized_result = _AddMaterializationTransforms().visit(result)
     p.run().wait_until_finish()
-    return _MaterializePValues(cache).visit(result)
+    _release_materialized_pipeline(p)
+    return _FinalizeMaterialization().visit(materialized_result)
 
   def _extract_input_pvalues(self, pvalueish):
     """Extract all the pvalues contained in the input pvalueish.
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 09ac72b..1f01c9c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1407,7 +1407,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "Valid object instance must be of type 'tuple'. Instead, "
         "an instance of 'float' was received.")
 
-  def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
+  def test_pipeline_runtime_checking_violation_with_side_inputs_decorator(self):
     self.p._options.view_as(TypeOptions).pipeline_type_check = False
     self.p._options.view_as(TypeOptions).runtime_type_check = True
 
@@ -1427,7 +1427,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         "Expected an instance of <type 'int'>, "
         "instead found 1.0, an instance of <type 'float'>.")
 
-  def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
+  def test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self):  # pylint: disable=line-too-long
     self.p._options.view_as(TypeOptions).runtime_type_check = True
     self.p._options.view_as(TypeOptions).pipeline_type_check = False
 
@@ -2062,6 +2062,18 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     x = self.p | 'C2' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)
 
+  def test_eager_execution(self):
+    doubled = [1, 2, 3, 4] | beam.Map(lambda x: 2 * x)
+    self.assertEqual([2, 4, 6, 8], doubled)
+
+  def test_eager_execution_tagged_outputs(self):
+    result = [1, 2, 3, 4] | beam.Map(
+        lambda x: pvalue.TaggedOutput('bar', 2 * x)).with_outputs('bar')
+    self.assertEqual([2, 4, 6, 8], result.bar)
+    with self.assertRaises(KeyError,
+                           msg='Tag \'foo\' is not a defined output tag'):
+      result.foo
+
 
 if __name__ == '__main__':
   unittest.main()

-- 
To stop receiving notification emails like this one, please contact
robertwb@apache.org.