You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/16 16:57:26 UTC
[1/2] beam git commit: Introduce pending bundles and
RootBundleProvider in DirectRunner
Repository: beam
Updated Branches:
refs/heads/master f2a32b2ca -> 54f307891
Introduce pending bundles and RootBundleProvider in DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5aee624c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5aee624c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5aee624c
Branch: refs/heads/master
Commit: 5aee624cbc2815efaf04c7e4854138370a45a1f6
Parents: f2a32b2
Author: Charles Chen <cc...@google.com>
Authored: Thu Jun 15 14:27:47 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jun 16 09:56:51 2017 -0700
----------------------------------------------------------------------
.../runners/direct/bundle_factory.py | 2 +-
.../apache_beam/runners/direct/executor.py | 64 ++++++++++++--------
.../runners/direct/transform_evaluator.py | 39 +++++++++++-
3 files changed, 77 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5aee624c/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index ed00b03..0182b4c 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -108,7 +108,7 @@ class _Bundle(object):
self._initial_windowed_value.windows)
def __init__(self, pcollection, stacked=True):
- assert isinstance(pcollection, pvalue.PCollection)
+ assert isinstance(pcollection, (pvalue.PBegin, pvalue.PCollection))
self._pcollection = pcollection
self._elements = []
self._stacked = stacked
http://git-wip-us.apache.org/repos/asf/beam/blob/5aee624c/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 86db291..eff2d3c 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -20,6 +20,7 @@
from __future__ import absolute_import
import collections
+import itertools
import logging
import Queue
import sys
@@ -250,12 +251,12 @@ class TransformExecutor(_ExecutorService.CallableTask):
"""
def __init__(self, transform_evaluator_registry, evaluation_context,
- input_bundle, applied_transform, completion_callback,
+ input_bundle, applied_ptransform, completion_callback,
transform_evaluation_state):
self._transform_evaluator_registry = transform_evaluator_registry
self._evaluation_context = evaluation_context
self._input_bundle = input_bundle
- self._applied_transform = applied_transform
+ self._applied_ptransform = applied_ptransform
self._completion_callback = completion_callback
self._transform_evaluation_state = transform_evaluation_state
self._side_input_values = {}
@@ -264,11 +265,11 @@ class TransformExecutor(_ExecutorService.CallableTask):
def call(self):
self._call_count += 1
- assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
- metrics_container = MetricsContainer(self._applied_transform.full_label)
+ assert self._call_count <= (1 + len(self._applied_ptransform.side_inputs))
+ metrics_container = MetricsContainer(self._applied_ptransform.full_label)
scoped_metrics_container = ScopedMetricsContainer(metrics_container)
- for side_input in self._applied_transform.side_inputs:
+ for side_input in self._applied_ptransform.side_inputs:
if side_input not in self._side_input_values:
has_result, value = (
self._evaluation_context.get_value_or_schedule_after_output(
@@ -280,11 +281,11 @@ class TransformExecutor(_ExecutorService.CallableTask):
self._side_input_values[side_input] = value
side_input_values = [self._side_input_values[side_input]
- for side_input in self._applied_transform.side_inputs]
+ for side_input in self._applied_ptransform.side_inputs]
try:
- evaluator = self._transform_evaluator_registry.for_application(
- self._applied_transform, self._input_bundle,
+ evaluator = self._transform_evaluator_registry.get_evaluator(
+ self._applied_ptransform, self._input_bundle,
side_input_values, scoped_metrics_container)
if self._input_bundle:
@@ -298,13 +299,13 @@ class TransformExecutor(_ExecutorService.CallableTask):
if self._evaluation_context.has_cache:
for uncommitted_bundle in result.uncommitted_output_bundles:
self._evaluation_context.append_to_cache(
- self._applied_transform, uncommitted_bundle.tag,
+ self._applied_ptransform, uncommitted_bundle.tag,
uncommitted_bundle.get_elements_iterable())
undeclared_tag_values = result.undeclared_tag_values
if undeclared_tag_values:
for tag, value in undeclared_tag_values.iteritems():
self._evaluation_context.append_to_cache(
- self._applied_transform, tag, value)
+ self._applied_ptransform, tag, value)
self._completion_callback.handle_result(self._input_bundle, result)
return result
@@ -353,6 +354,15 @@ class _ExecutorServiceParallelExecutor(object):
def start(self, roots):
self.root_nodes = frozenset(roots)
+ self.all_nodes = frozenset(
+ itertools.chain(
+ roots,
+ *itertools.chain(self.value_to_consumers.values())))
+ self.node_to_pending_bundles = {}
+ for root_node in self.root_nodes:
+ provider = (self.transform_evaluator_registry
+ .get_root_bundle_provider(root_node))
+ self.node_to_pending_bundles[root_node] = provider.get_root_bundles()
self.executor_service.submit(
_ExecutorServiceParallelExecutor._MonitorTask(self))
@@ -372,22 +382,22 @@ class _ExecutorServiceParallelExecutor(object):
self.schedule_consumption(applied_ptransform, committed_bundle,
self.default_completion_callback)
- def schedule_consumption(self, consumer_applied_transform, committed_bundle,
+ def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
on_complete):
"""Schedules evaluation of the given bundle with the transform."""
- assert all([consumer_applied_transform, on_complete])
- assert committed_bundle or consumer_applied_transform in self.root_nodes
- if (committed_bundle
- and self.transform_evaluator_registry.should_execute_serially(
- consumer_applied_transform)):
+ assert consumer_applied_ptransform
+ assert committed_bundle
+ assert on_complete
+ if self.transform_evaluator_registry.should_execute_serially(
+ consumer_applied_ptransform):
transform_executor_service = self.transform_executor_services.serial(
- consumer_applied_transform)
+ consumer_applied_ptransform)
else:
transform_executor_service = self.transform_executor_services.parallel()
transform_executor = TransformExecutor(
self.transform_evaluator_registry, self.evaluation_context,
- committed_bundle, consumer_applied_transform, on_complete,
+ committed_bundle, consumer_applied_ptransform, on_complete,
transform_executor_service)
transform_executor_service.schedule(transform_executor)
@@ -564,10 +574,14 @@ class _ExecutorServiceParallelExecutor(object):
# additional work.
return
- # All current TransformExecutors are blocked; add more work from the
- # roots.
- for applied_transform in self._executor.root_nodes:
- if not self._executor.evaluation_context.is_done(applied_transform):
- self._executor.schedule_consumption(
- applied_transform, None,
- self._executor.default_completion_callback)
+ # All current TransformExecutors are blocked; add more work from any
+ # pending bundles.
+ for applied_ptransform in self._executor.all_nodes:
+ if not self._executor.evaluation_context.is_done(applied_ptransform):
+ pending_bundles = self._executor.node_to_pending_bundles.get(
+ applied_ptransform, [])
+ for bundle in pending_bundles:
+ self._executor.schedule_consumption(
+ applied_ptransform, bundle,
+ self._executor.default_completion_callback)
+ self._executor.node_to_pending_bundles[applied_ptransform] = []
http://git-wip-us.apache.org/repos/asf/beam/blob/5aee624c/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index f5b5db5..6e73561 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -58,8 +58,11 @@ class TransformEvaluatorRegistry(object):
core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
_NativeWrite: _NativeWriteEvaluator,
}
+ self._root_bundle_providers = {
+ core.PTransform: DefaultRootBundleProvider,
+ }
- def for_application(
+ def get_evaluator(
self, applied_ptransform, input_committed_bundle,
side_inputs, scoped_metrics_container):
"""Returns a TransformEvaluator suitable for processing given inputs."""
@@ -81,6 +84,18 @@ class TransformEvaluatorRegistry(object):
input_committed_bundle, side_inputs,
scoped_metrics_container)
+ def get_root_bundle_provider(self, applied_ptransform):
+ provider_cls = None
+ for cls in applied_ptransform.transform.__class__.mro():
+ provider_cls = self._root_bundle_providers.get(cls)
+ if provider_cls:
+ break
+ if not provider_cls:
+ raise NotImplementedError(
+ 'Root provider for [%s] not implemented in runner %s' % (
+ type(applied_ptransform.transform), self))
+ return provider_cls(self._evaluation_context, applied_ptransform)
+
def should_execute_serially(self, applied_ptransform):
"""Returns True if this applied_ptransform should run one bundle at a time.
@@ -104,6 +119,27 @@ class TransformEvaluatorRegistry(object):
(core._GroupByKeyOnly, _NativeWrite))
+class RootBundleProvider(object):
+ """Provides bundles for the initial execution of a root transform."""
+
+ def __init__(self, evaluation_context, applied_ptransform):
+ self._evaluation_context = evaluation_context
+ self._applied_ptransform = applied_ptransform
+
+ def get_root_bundles(self):
+ raise NotImplementedError
+
+
+class DefaultRootBundleProvider(RootBundleProvider):
+ """Provides an empty bundle by default for root transforms."""
+
+ def get_root_bundles(self):
+ input_node = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
+ empty_bundle = (
+ self._evaluation_context.create_empty_committed_bundle(input_node))
+ return [empty_bundle]
+
+
class _TransformEvaluator(object):
"""An evaluator of a specific application of a transform."""
@@ -180,7 +216,6 @@ class _BoundedReadEvaluator(_TransformEvaluator):
def __init__(self, evaluation_context, applied_ptransform,
input_committed_bundle, side_inputs, scoped_metrics_container):
- assert not input_committed_bundle
assert not side_inputs
self._source = applied_ptransform.transform.source
self._source.pipeline_options = evaluation_context.pipeline_options
[2/2] beam git commit: This closes #3370
Posted by al...@apache.org.
This closes #3370
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54f30789
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54f30789
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54f30789
Branch: refs/heads/master
Commit: 54f30789165a49d92479b54cf3dac9d4a063e366
Parents: f2a32b2 5aee624
Author: Ahmet Altay <al...@google.com>
Authored: Fri Jun 16 09:57:16 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Jun 16 09:57:16 2017 -0700
----------------------------------------------------------------------
.../runners/direct/bundle_factory.py | 2 +-
.../apache_beam/runners/direct/executor.py | 64 ++++++++++++--------
.../runners/direct/transform_evaluator.py | 39 +++++++++++-
3 files changed, 77 insertions(+), 28 deletions(-)
----------------------------------------------------------------------