You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/01/29 22:43:36 UTC
[beam] branch master updated: [BEAM-3537] Allow more general eager
in-process pipeline execution (#4492)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d920644 [BEAM-3537] Allow more general eager in-process pipeline execution (#4492)
d920644 is described below
commit d9206441a16fa39e171684c43ec724cdd80a6ca1
Author: Charles Chen <ch...@users.noreply.github.com>
AuthorDate: Mon Jan 29 14:43:32 2018 -0800
[BEAM-3537] Allow more general eager in-process pipeline execution (#4492)
[BEAM-3537] Allow more general eager in-process pipeline execution
This change also removes the Python DirectRunner-specific PValue cache.
---
.../apache_beam/examples/snippets/snippets_test.py | 31 +++---
sdks/python/apache_beam/runners/__init__.py | 1 -
.../apache_beam/runners/direct/direct_runner.py | 55 ----------
.../runners/direct/evaluation_context.py | 14 ---
sdks/python/apache_beam/runners/direct/executor.py | 11 --
sdks/python/apache_beam/runners/runner.py | 2 +-
sdks/python/apache_beam/transforms/ptransform.py | 119 ++++++++++++++++++---
.../apache_beam/transforms/ptransform_test.py | 16 ++-
8 files changed, 136 insertions(+), 113 deletions(-)
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f05dc39..e731236 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -223,24 +223,27 @@ class ParDoTest(unittest.TestCase):
self.assertEqual({'xyz'}, set(marked))
def test_pardo_with_undeclared_outputs(self):
- numbers = [1, 2, 3, 4, 5, 10, 20]
+ # Note: the use of undeclared outputs is currently not supported in eager
+ # execution mode.
+ with TestPipeline() as p:
+ numbers = p | beam.Create([1, 2, 3, 4, 5, 10, 20])
- # [START model_pardo_with_undeclared_outputs]
- def even_odd(x):
- yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
- if x % 10 == 0:
- yield x
+ # [START model_pardo_with_undeclared_outputs]
+ def even_odd(x):
+ yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
+ if x % 10 == 0:
+ yield x
- results = numbers | beam.FlatMap(even_odd).with_outputs()
+ results = numbers | beam.FlatMap(even_odd).with_outputs()
- evens = results.even
- odds = results.odd
- tens = results[None] # the undeclared main output
- # [END model_pardo_with_undeclared_outputs]
+ evens = results.even
+ odds = results.odd
+ tens = results[None] # the undeclared main output
+ # [END model_pardo_with_undeclared_outputs]
- self.assertEqual({2, 4, 10, 20}, set(evens))
- self.assertEqual({1, 3, 5}, set(odds))
- self.assertEqual({10, 20}, set(tens))
+ assert_that(evens, equal_to([2, 4, 10, 20]), label='assert_even')
+ assert_that(odds, equal_to([1, 3, 5]), label='assert_odds')
+ assert_that(tens, equal_to([10, 20]), label='assert_tens')
class TypeHintsTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index 2b93c30..863e67e 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -21,7 +21,6 @@ This package defines runners, which are used to execute a pipeline.
"""
from apache_beam.runners.direct.direct_runner import DirectRunner
-from apache_beam.runners.direct.direct_runner import EagerRunner
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
from apache_beam.runners.runner import create_runner
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2bd6b45..b18d492 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -23,7 +23,6 @@ graph of transformations belonging to a pipeline on the local machine.
from __future__ import absolute_import
-import collections
import logging
from google.protobuf import wrappers_pb2
@@ -41,7 +40,6 @@ from apache_beam.runners.direct.clock import TestClock
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner
from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
from apache_beam.transforms.core import _GroupAlsoByWindow
from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.transforms.ptransform import PTransform
@@ -106,7 +104,6 @@ class DirectRunner(PipelineRunner):
"""Executes a single pipeline on the local machine."""
def __init__(self):
- self._cache = None
self._use_test_clock = False # use RealClock() in production
self._ptransform_overrides = _get_transform_overrides()
@@ -229,8 +226,6 @@ class DirectRunner(PipelineRunner):
self.consumer_tracking_visitor.views,
clock)
- evaluation_context.use_pvalue_cache(self._cache)
-
executor = Executor(self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
@@ -242,53 +237,8 @@ class DirectRunner(PipelineRunner):
executor.start(self.consumer_tracking_visitor.root_transforms)
result = DirectPipelineResult(executor, evaluation_context)
- if self._cache:
- # We are running in eager mode, block until the pipeline execution
- # completes in order to have full results in the cache.
- result.wait_until_finish()
- self._cache.finalize()
-
return result
- @property
- def cache(self):
- if not self._cache:
- self._cache = BufferingInMemoryCache()
- return self._cache.pvalue_cache
-
-
-class BufferingInMemoryCache(object):
- """PValueCache wrapper for buffering bundles until a PValue is fully computed.
-
- BufferingInMemoryCache keeps an in memory cache of
- (applied_ptransform, tag) tuples. It accepts appending to existing cache
- entries until it is finalized. finalize() will make all the existing cached
- entries visible to the underyling PValueCache in their entirety, clean the in
- memory cache and stop accepting new cache entries.
- """
-
- def __init__(self):
- self._cache = collections.defaultdict(list)
- self._pvalue_cache = PValueCache()
- self._finalized = False
-
- @property
- def pvalue_cache(self):
- return self._pvalue_cache
-
- def append(self, applied_ptransform, tag, elements):
- assert not self._finalized
- assert elements is not None
- self._cache[(applied_ptransform, tag)].extend(elements)
-
- def finalize(self):
- """Make buffered cache elements visible to the underlying PValueCache."""
- assert not self._finalized
- for key, value in self._cache.iteritems():
- applied_ptransform, tag = key
- self._pvalue_cache.cache_output(applied_ptransform, tag, value)
- self._cache = None
-
class DirectPipelineResult(PipelineResult):
"""A DirectPipelineResult provides access to info about a pipeline."""
@@ -329,8 +279,3 @@ class DirectPipelineResult(PipelineResult):
def metrics(self):
return self._evaluation_context.metrics()
-
-
-class EagerRunner(DirectRunner):
-
- is_eager = True
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 718dafa..46176c9 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -155,7 +155,6 @@ class EvaluationContext(object):
self._side_inputs_container = _SideInputsContainer(views)
self._pending_unblocked_tasks = []
self._counter_factory = counters.CounterFactory()
- self._cache = None
self._metrics = DirectMetrics()
self._lock = threading.Lock()
@@ -169,23 +168,10 @@ class EvaluationContext(object):
transform_keyed_states[consumer] = {}
return transform_keyed_states
- def use_pvalue_cache(self, cache):
- assert not self._cache
- self._cache = cache
-
def metrics(self):
# TODO. Should this be made a @property?
return self._metrics
- @property
- def has_cache(self):
- return self._cache is not None
-
- def append_to_cache(self, applied_ptransform, tag, elements):
- with self._lock:
- assert self._cache
- self._cache.append(applied_ptransform, tag, elements)
-
def is_root_transform(self, applied_ptransform):
return applied_ptransform in self._root_transforms
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 1cbabc4..d4d9cb5 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -341,17 +341,6 @@ class TransformExecutor(_ExecutorService.CallableTask):
result = evaluator.finish_bundle()
result.logical_metric_updates = metrics_container.get_cumulative()
- if self._evaluation_context.has_cache:
- for uncommitted_bundle in result.uncommitted_output_bundles:
- self._evaluation_context.append_to_cache(
- self._applied_ptransform, uncommitted_bundle.tag,
- uncommitted_bundle.get_elements_iterable())
- undeclared_tag_values = result.undeclared_tag_values
- if undeclared_tag_values:
- for tag, value in undeclared_tag_values.iteritems():
- self._evaluation_context.append_to_cache(
- self._applied_ptransform, tag, value)
-
self._completion_callback.handle_result(self, self._input_bundle, result)
return result
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 78ae4d8..1d0f700 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -45,7 +45,7 @@ _PYTHON_RPC_DIRECT_RUNNER = (
'python_rpc_direct_runner.')
_KNOWN_PYTHON_RPC_DIRECT_RUNNER = ('PythonRPCDirectRunner',)
-_KNOWN_DIRECT_RUNNERS = ('DirectRunner', 'EagerRunner')
+_KNOWN_DIRECT_RUNNERS = ('DirectRunner',)
_KNOWN_DATAFLOW_RUNNERS = ('DataflowRunner',)
_KNOWN_TEST_RUNNERS = ('TestDataflowRunner',)
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 0b6d608..2490495 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -38,9 +38,11 @@ from __future__ import absolute_import
import copy
import inspect
+import itertools
import operator
import os
import sys
+import threading
from functools import reduce
from google.protobuf import wrappers_pb2
@@ -97,29 +99,116 @@ class _SetInputPValues(_PValueishTransform):
return self.visit_nested(node, replacements)
+# Caches to allow for materialization of values when executing a pipeline
+# in-process, in eager mode. This cache allows the same _MaterializedResult
+# object to be accessed and used despite Runner API round-trip serialization.
+_pipeline_materialization_cache = {}
+_pipeline_materialization_lock = threading.Lock()
+
+
+def _allocate_materialized_pipeline(pipeline):
+ pid = os.getpid()
+ with _pipeline_materialization_lock:
+ pipeline_id = id(pipeline)
+ _pipeline_materialization_cache[(pid, pipeline_id)] = {}
+
+
+def _allocate_materialized_result(pipeline):
+ pid = os.getpid()
+ with _pipeline_materialization_lock:
+ pipeline_id = id(pipeline)
+ if (pid, pipeline_id) not in _pipeline_materialization_cache:
+ raise ValueError('Materialized pipeline is not allocated for result '
+ 'cache.')
+ result_id = len(_pipeline_materialization_cache[(pid, pipeline_id)])
+ result = _MaterializedResult(pipeline_id, result_id)
+ _pipeline_materialization_cache[(pid, pipeline_id)][result_id] = result
+ return result
+
+
+def _get_materialized_result(pipeline_id, result_id):
+ pid = os.getpid()
+ with _pipeline_materialization_lock:
+ if (pid, pipeline_id) not in _pipeline_materialization_cache:
+ raise Exception(
+ 'Materialization in out-of-process and remote runners is not yet '
+ 'supported.')
+ return _pipeline_materialization_cache[(pid, pipeline_id)][result_id]
+
+
+def _release_materialized_pipeline(pipeline):
+ pid = os.getpid()
+ with _pipeline_materialization_lock:
+ pipeline_id = id(pipeline)
+ del _pipeline_materialization_cache[(pid, pipeline_id)]
+
+
+class _MaterializedResult(object):
+ def __init__(self, pipeline_id, result_id):
+ self._pipeline_id = pipeline_id
+ self._result_id = result_id
+ self.elements = []
+
+ def __reduce__(self):
+ # When unpickled (during Runner API roundtrip serailization), get the
+ # _MaterializedResult object from the cache so that values are written
+ # to the original _MaterializedResult when run in eager mode.
+ return (_get_materialized_result, (self._pipeline_id, self._result_id))
+
+
class _MaterializedDoOutputsTuple(pvalue.DoOutputsTuple):
- def __init__(self, deferred, pvalue_cache):
+ def __init__(self, deferred, results_by_tag):
super(_MaterializedDoOutputsTuple, self).__init__(
None, None, deferred._tags, deferred._main_tag)
self._deferred = deferred
- self._pvalue_cache = pvalue_cache
+ self._results_by_tag = results_by_tag
def __getitem__(self, tag):
- # Simply accessing the value should not use it up.
- return self._pvalue_cache.get_unwindowed_pvalue(
- self._deferred[tag], decref=False)
+ if tag not in self._results_by_tag:
+ raise KeyError(
+ 'Tag %r is not a a defined output tag of %s.' % (
+ tag, self._deferred))
+ return self._results_by_tag[tag].elements
+
+
+class _AddMaterializationTransforms(_PValueishTransform):
+ def _materialize_transform(self, pipeline):
+ result = _allocate_materialized_result(pipeline)
-class _MaterializePValues(_PValueishTransform):
- def __init__(self, pvalue_cache):
- self._pvalue_cache = pvalue_cache
+ # Need to define _MaterializeValuesDoFn here to avoid circular
+ # dependencies.
+ from apache_beam import DoFn
+ from apache_beam import ParDo
+
+ class _MaterializeValuesDoFn(DoFn):
+ def process(self, element):
+ result.elements.append(element)
+
+ materialization_label = '_MaterializeValues%d' % result._result_id
+ return (materialization_label >> ParDo(_MaterializeValuesDoFn()),
+ result)
def visit(self, node):
if isinstance(node, pvalue.PValue):
- # Simply accessing the value should not use it up.
- return self._pvalue_cache.get_unwindowed_pvalue(node, decref=False)
+ transform, result = self._materialize_transform(node.pipeline)
+ node | transform
+ return result
elif isinstance(node, pvalue.DoOutputsTuple):
- return _MaterializedDoOutputsTuple(node, self._pvalue_cache)
+ results_by_tag = {}
+ for tag in itertools.chain([node._main_tag], node._tags):
+ results_by_tag[tag] = self.visit(node[tag])
+ return _MaterializedDoOutputsTuple(node, results_by_tag)
+ else:
+ return self.visit_nested(node)
+
+
+class _FinalizeMaterialization(_PValueishTransform):
+ def visit(self, node):
+ if isinstance(node, _MaterializedResult):
+ return node.elements
+ elif isinstance(node, _MaterializedDoOutputsTuple):
+ return node
else:
return self.visit_nested(node)
@@ -399,11 +488,11 @@ class PTransform(WithTypeHints, HasDisplayData):
result = p.apply(self, pvalueish, label)
if deferred:
return result
- # Get a reference to the runners internal cache, otherwise runner may
- # clean it after run.
- cache = p.runner.cache
+ _allocate_materialized_pipeline(p)
+ materialized_result = _AddMaterializationTransforms().visit(result)
p.run().wait_until_finish()
- return _MaterializePValues(cache).visit(result)
+ _release_materialized_pipeline(p)
+ return _FinalizeMaterialization().visit(materialized_result)
def _extract_input_pvalues(self, pvalueish):
"""Extract all the pvalues contained in the input pvalueish.
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 09ac72b..1f01c9c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -1407,7 +1407,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"Valid object instance must be of type 'tuple'. Instead, "
"an instance of 'float' was received.")
- def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self):
+ def test_pipeline_runtime_checking_violation_with_side_inputs_decorator(self):
self.p._options.view_as(TypeOptions).pipeline_type_check = False
self.p._options.view_as(TypeOptions).runtime_type_check = True
@@ -1427,7 +1427,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
"Expected an instance of <type 'int'>, "
"instead found 1.0, an instance of <type 'float'>.")
- def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self):
+ def test_pipeline_runtime_checking_violation_with_side_inputs_via_method(self): # pylint: disable=line-too-long
self.p._options.view_as(TypeOptions).runtime_type_check = True
self.p._options.view_as(TypeOptions).pipeline_type_check = False
@@ -2062,6 +2062,18 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
x = self.p | 'C2' >> beam.Create([1, 2, 3, 4])
self.assertEqual(int, x.element_type)
+ def test_eager_execution(self):
+ doubled = [1, 2, 3, 4] | beam.Map(lambda x: 2 * x)
+ self.assertEqual([2, 4, 6, 8], doubled)
+
+ def test_eager_execution_tagged_outputs(self):
+ result = [1, 2, 3, 4] | beam.Map(
+ lambda x: pvalue.TaggedOutput('bar', 2 * x)).with_outputs('bar')
+ self.assertEqual([2, 4, 6, 8], result.bar)
+ with self.assertRaises(KeyError,
+ msg='Tag \'foo\' is not a defined output tag'):
+ result.foo
+
if __name__ == '__main__':
unittest.main()
--
To stop receiving notification emails like this one, please contact
robertwb@apache.org.